1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
4 IncomingCall,
5};
6use anyhow::{anyhow, Result};
7use audio::{Audio, Sound};
8use client::{
9 proto::{self, PeerId},
10 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
11};
12use collections::{BTreeMap, HashMap, HashSet};
13use fs::Fs;
14use futures::{FutureExt, StreamExt};
15use gpui::{
16 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
17};
18use language::LanguageRegistry;
19use live_kit_client::{
20 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
21 RemoteVideoTrackUpdate,
22};
23use postage::{sink::Sink, stream::Stream, watch};
24use project::Project;
25use settings::Settings;
26use std::{future::Future, mem, sync::Arc, time::Duration};
27use util::{post_inc, ResultExt, TryFutureExt};
28
29pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
30
31#[derive(Clone, Debug, PartialEq, Eq)]
32pub enum Event {
33 ParticipantLocationChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteVideoTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteAudioTracksChanged {
40 participant_id: proto::PeerId,
41 },
42 RemoteProjectShared {
43 owner: Arc<User>,
44 project_id: u64,
45 worktree_root_names: Vec<String>,
46 },
47 RemoteProjectUnshared {
48 project_id: u64,
49 },
50 RemoteProjectJoined {
51 project_id: u64,
52 },
53 RemoteProjectInvitationDiscarded {
54 project_id: u64,
55 },
56 Left,
57}
58
59pub struct Room {
60 id: u64,
61 channel_id: Option<u64>,
62 live_kit: Option<LiveKitRoom>,
63 status: RoomStatus,
64 shared_projects: HashSet<WeakModel<Project>>,
65 joined_projects: HashSet<WeakModel<Project>>,
66 local_participant: LocalParticipant,
67 remote_participants: BTreeMap<u64, RemoteParticipant>,
68 pending_participants: Vec<Arc<User>>,
69 participant_user_ids: HashSet<u64>,
70 pending_call_count: usize,
71 leave_when_empty: bool,
72 client: Arc<Client>,
73 user_store: Model<UserStore>,
74 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
75 client_subscriptions: Vec<client::Subscription>,
76 _subscriptions: Vec<gpui::Subscription>,
77 room_update_completed_tx: watch::Sender<Option<()>>,
78 room_update_completed_rx: watch::Receiver<Option<()>>,
79 pending_room_update: Option<Task<()>>,
80 maintain_connection: Option<Task<Option<()>>>,
81}
82
83impl EventEmitter for Room {
84 type Event = Event;
85}
86
87impl Room {
88 pub fn channel_id(&self) -> Option<u64> {
89 self.channel_id
90 }
91
92 pub fn is_sharing_project(&self) -> bool {
93 !self.shared_projects.is_empty()
94 }
95
96 #[cfg(any(test, feature = "test-support"))]
97 pub fn is_connected(&self) -> bool {
98 if let Some(live_kit) = self.live_kit.as_ref() {
99 matches!(
100 *live_kit.room.status().borrow(),
101 live_kit_client::ConnectionState::Connected { .. }
102 )
103 } else {
104 false
105 }
106 }
107
108 fn new(
109 id: u64,
110 channel_id: Option<u64>,
111 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
112 client: Arc<Client>,
113 user_store: Model<UserStore>,
114 cx: &mut ModelContext<Self>,
115 ) -> Self {
116 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
117 let room = live_kit_client::Room::new();
118 let mut status = room.status();
119 // Consume the initial status of the room.
120 let _ = status.try_recv();
121 let _maintain_room = cx.spawn(|this, mut cx| async move {
122 while let Some(status) = status.next().await {
123 let this = if let Some(this) = this.upgrade() {
124 this
125 } else {
126 break;
127 };
128
129 if status == live_kit_client::ConnectionState::Disconnected {
130 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
131 .ok();
132 break;
133 }
134 }
135 });
136
137 let _maintain_video_tracks = cx.spawn({
138 let room = room.clone();
139 move |this, mut cx| async move {
140 let mut track_video_changes = room.remote_video_track_updates();
141 while let Some(track_change) = track_video_changes.next().await {
142 let this = if let Some(this) = this.upgrade() {
143 this
144 } else {
145 break;
146 };
147
148 this.update(&mut cx, |this, cx| {
149 this.remote_video_track_updated(track_change, cx).log_err()
150 })
151 .ok();
152 }
153 }
154 });
155
156 let _maintain_audio_tracks = cx.spawn({
157 let room = room.clone();
158 |this, mut cx| async move {
159 let mut track_audio_changes = room.remote_audio_track_updates();
160 while let Some(track_change) = track_audio_changes.next().await {
161 let this = if let Some(this) = this.upgrade() {
162 this
163 } else {
164 break;
165 };
166
167 this.update(&mut cx, |this, cx| {
168 this.remote_audio_track_updated(track_change, cx).log_err()
169 })
170 .ok();
171 }
172 }
173 });
174
175 let connect = room.connect(&connection_info.server_url, &connection_info.token);
176 cx.spawn(|this, mut cx| async move {
177 connect.await?;
178
179 if !cx.update(|cx| Self::mute_on_join(cx))? {
180 this.update(&mut cx, |this, cx| this.share_microphone(cx))?
181 .await?;
182 }
183
184 anyhow::Ok(())
185 })
186 .detach_and_log_err(cx);
187
188 Some(LiveKitRoom {
189 room,
190 screen_track: LocalTrack::None,
191 microphone_track: LocalTrack::None,
192 next_publish_id: 0,
193 muted_by_user: false,
194 deafened: false,
195 speaking: false,
196 _maintain_room,
197 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
198 })
199 } else {
200 None
201 };
202
203 let maintain_connection = cx.spawn({
204 let client = client.clone();
205 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
206 });
207
208 Audio::play_sound(Sound::Joined, cx);
209
210 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
211
212 Self {
213 id,
214 channel_id,
215 live_kit: live_kit_room,
216 status: RoomStatus::Online,
217 shared_projects: Default::default(),
218 joined_projects: Default::default(),
219 participant_user_ids: Default::default(),
220 local_participant: Default::default(),
221 remote_participants: Default::default(),
222 pending_participants: Default::default(),
223 pending_call_count: 0,
224 client_subscriptions: vec![
225 client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
226 ],
227 _subscriptions: vec![
228 cx.on_release(Self::released),
229 cx.on_app_quit(Self::app_will_quit),
230 ],
231 leave_when_empty: false,
232 pending_room_update: None,
233 client,
234 user_store,
235 follows_by_leader_id_project_id: Default::default(),
236 maintain_connection: Some(maintain_connection),
237 room_update_completed_tx,
238 room_update_completed_rx,
239 }
240 }
241
242 pub(crate) fn create(
243 called_user_id: u64,
244 initial_project: Option<Model<Project>>,
245 client: Arc<Client>,
246 user_store: Model<UserStore>,
247 cx: &mut AppContext,
248 ) -> Task<Result<Model<Self>>> {
249 cx.spawn(move |mut cx| async move {
250 let response = client.request(proto::CreateRoom {}).await?;
251 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
252 let room = cx.build_model(|cx| {
253 Self::new(
254 room_proto.id,
255 None,
256 response.live_kit_connection_info,
257 client,
258 user_store,
259 cx,
260 )
261 })?;
262
263 let initial_project_id = if let Some(initial_project) = initial_project {
264 let initial_project_id = room
265 .update(&mut cx, |room, cx| {
266 room.share_project(initial_project.clone(), cx)
267 })?
268 .await?;
269 Some(initial_project_id)
270 } else {
271 None
272 };
273
274 match room
275 .update(&mut cx, |room, cx| {
276 room.leave_when_empty = true;
277 room.call(called_user_id, initial_project_id, cx)
278 })?
279 .await
280 {
281 Ok(()) => Ok(room),
282 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
283 }
284 })
285 }
286
287 pub(crate) fn join_channel(
288 channel_id: u64,
289 client: Arc<Client>,
290 user_store: Model<UserStore>,
291 cx: &mut AppContext,
292 ) -> Task<Result<Model<Self>>> {
293 cx.spawn(move |cx| async move {
294 Self::from_join_response(
295 client.request(proto::JoinChannel { channel_id }).await?,
296 client,
297 user_store,
298 cx,
299 )
300 })
301 }
302
303 pub(crate) fn join(
304 call: &IncomingCall,
305 client: Arc<Client>,
306 user_store: Model<UserStore>,
307 cx: &mut AppContext,
308 ) -> Task<Result<Model<Self>>> {
309 let id = call.room_id;
310 cx.spawn(move |cx| async move {
311 Self::from_join_response(
312 client.request(proto::JoinRoom { id }).await?,
313 client,
314 user_store,
315 cx,
316 )
317 })
318 }
319
320 fn released(&mut self, cx: &mut AppContext) {
321 if self.status.is_online() {
322 self.leave_internal(cx).detach_and_log_err(cx);
323 }
324 }
325
326 fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
327 let task = if self.status.is_online() {
328 let leave = self.leave_internal(cx);
329 Some(cx.background_executor().spawn(async move {
330 leave.await.log_err();
331 }))
332 } else {
333 None
334 };
335
336 async move {
337 if let Some(task) = task {
338 task.await;
339 }
340 }
341 }
342
343 pub fn mute_on_join(cx: &AppContext) -> bool {
344 CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
345 }
346
347 fn from_join_response(
348 response: proto::JoinRoomResponse,
349 client: Arc<Client>,
350 user_store: Model<UserStore>,
351 mut cx: AsyncAppContext,
352 ) -> Result<Model<Self>> {
353 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
354 let room = cx.build_model(|cx| {
355 Self::new(
356 room_proto.id,
357 response.channel_id,
358 response.live_kit_connection_info,
359 client,
360 user_store,
361 cx,
362 )
363 })?;
364 room.update(&mut cx, |room, cx| {
365 room.leave_when_empty = room.channel_id.is_none();
366 room.apply_room_update(room_proto, cx)?;
367 anyhow::Ok(())
368 })??;
369 Ok(room)
370 }
371
372 fn should_leave(&self) -> bool {
373 self.leave_when_empty
374 && self.pending_room_update.is_none()
375 && self.pending_participants.is_empty()
376 && self.remote_participants.is_empty()
377 && self.pending_call_count == 0
378 }
379
380 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
381 cx.notify();
382 cx.emit(Event::Left);
383 self.leave_internal(cx)
384 }
385
386 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
387 if self.status.is_offline() {
388 return Task::ready(Err(anyhow!("room is offline")));
389 }
390
391 log::info!("leaving room");
392 Audio::play_sound(Sound::Leave, cx);
393
394 self.clear_state(cx);
395
396 let leave_room = self.client.request(proto::LeaveRoom {});
397 cx.background_executor().spawn(async move {
398 leave_room.await?;
399 anyhow::Ok(())
400 })
401 }
402
403 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
404 for project in self.shared_projects.drain() {
405 if let Some(project) = project.upgrade() {
406 project.update(cx, |project, cx| {
407 project.unshare(cx).log_err();
408 });
409 }
410 }
411 for project in self.joined_projects.drain() {
412 if let Some(project) = project.upgrade() {
413 project.update(cx, |project, cx| {
414 project.disconnected_from_host(cx);
415 project.close(cx);
416 });
417 }
418 }
419
420 self.status = RoomStatus::Offline;
421 self.remote_participants.clear();
422 self.pending_participants.clear();
423 self.participant_user_ids.clear();
424 self.client_subscriptions.clear();
425 self.live_kit.take();
426 self.pending_room_update.take();
427 self.maintain_connection.take();
428 }
429
430 async fn maintain_connection(
431 this: WeakModel<Self>,
432 client: Arc<Client>,
433 mut cx: AsyncAppContext,
434 ) -> Result<()> {
435 let mut client_status = client.status();
436 loop {
437 let _ = client_status.try_recv();
438 let is_connected = client_status.borrow().is_connected();
439 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
440 if !is_connected || client_status.next().await.is_some() {
441 log::info!("detected client disconnection");
442
443 this.upgrade()
444 .ok_or_else(|| anyhow!("room was dropped"))?
445 .update(&mut cx, |this, cx| {
446 this.status = RoomStatus::Rejoining;
447 cx.notify();
448 })?;
449
450 // Wait for client to re-establish a connection to the server.
451 {
452 let mut reconnection_timeout =
453 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
454 let client_reconnection = async {
455 let mut remaining_attempts = 3;
456 while remaining_attempts > 0 {
457 if client_status.borrow().is_connected() {
458 log::info!("client reconnected, attempting to rejoin room");
459
460 let Some(this) = this.upgrade() else { break };
461 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
462 Ok(task) => {
463 if task.await.log_err().is_some() {
464 return true;
465 } else {
466 remaining_attempts -= 1;
467 }
468 }
469 Err(_app_dropped) => return false,
470 }
471 } else if client_status.borrow().is_signed_out() {
472 return false;
473 }
474
475 log::info!(
476 "waiting for client status change, remaining attempts {}",
477 remaining_attempts
478 );
479 client_status.next().await;
480 }
481 false
482 }
483 .fuse();
484 futures::pin_mut!(client_reconnection);
485
486 futures::select_biased! {
487 reconnected = client_reconnection => {
488 if reconnected {
489 log::info!("successfully reconnected to room");
490 // If we successfully joined the room, go back around the loop
491 // waiting for future connection status changes.
492 continue;
493 }
494 }
495 _ = reconnection_timeout => {
496 log::info!("room reconnection timeout expired");
497 }
498 }
499 }
500
501 break;
502 }
503 }
504
505 // The client failed to re-establish a connection to the server
506 // or an error occurred while trying to re-join the room. Either way
507 // we leave the room and return an error.
508 if let Some(this) = this.upgrade() {
509 log::info!("reconnection failed, leaving room");
510 let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
511 }
512 Err(anyhow!(
513 "can't reconnect to room: client failed to re-establish connection"
514 ))
515 }
516
517 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
518 let mut projects = HashMap::default();
519 let mut reshared_projects = Vec::new();
520 let mut rejoined_projects = Vec::new();
521 self.shared_projects.retain(|project| {
522 if let Some(handle) = project.upgrade() {
523 let project = handle.read(cx);
524 if let Some(project_id) = project.remote_id() {
525 projects.insert(project_id, handle.clone());
526 reshared_projects.push(proto::UpdateProject {
527 project_id,
528 worktrees: project.worktree_metadata_protos(cx),
529 });
530 return true;
531 }
532 }
533 false
534 });
535 self.joined_projects.retain(|project| {
536 if let Some(handle) = project.upgrade() {
537 let project = handle.read(cx);
538 if let Some(project_id) = project.remote_id() {
539 projects.insert(project_id, handle.clone());
540 rejoined_projects.push(proto::RejoinProject {
541 id: project_id,
542 worktrees: project
543 .worktrees()
544 .map(|worktree| {
545 let worktree = worktree.read(cx);
546 proto::RejoinWorktree {
547 id: worktree.id().to_proto(),
548 scan_id: worktree.completed_scan_id() as u64,
549 }
550 })
551 .collect(),
552 });
553 }
554 return true;
555 }
556 false
557 });
558
559 let response = self.client.request_envelope(proto::RejoinRoom {
560 id: self.id,
561 reshared_projects,
562 rejoined_projects,
563 });
564
565 cx.spawn(|this, mut cx| async move {
566 let response = response.await?;
567 let message_id = response.message_id;
568 let response = response.payload;
569 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
570 this.update(&mut cx, |this, cx| {
571 this.status = RoomStatus::Online;
572 this.apply_room_update(room_proto, cx)?;
573
574 for reshared_project in response.reshared_projects {
575 if let Some(project) = projects.get(&reshared_project.id) {
576 project.update(cx, |project, cx| {
577 project.reshared(reshared_project, cx).log_err();
578 });
579 }
580 }
581
582 for rejoined_project in response.rejoined_projects {
583 if let Some(project) = projects.get(&rejoined_project.id) {
584 project.update(cx, |project, cx| {
585 project.rejoined(rejoined_project, message_id, cx).log_err();
586 });
587 }
588 }
589
590 anyhow::Ok(())
591 })?
592 })
593 }
594
595 pub fn id(&self) -> u64 {
596 self.id
597 }
598
599 pub fn status(&self) -> RoomStatus {
600 self.status
601 }
602
603 pub fn local_participant(&self) -> &LocalParticipant {
604 &self.local_participant
605 }
606
607 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
608 &self.remote_participants
609 }
610
611 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
612 self.remote_participants
613 .values()
614 .find(|p| p.peer_id == peer_id)
615 }
616
617 pub fn pending_participants(&self) -> &[Arc<User>] {
618 &self.pending_participants
619 }
620
621 pub fn contains_participant(&self, user_id: u64) -> bool {
622 self.participant_user_ids.contains(&user_id)
623 }
624
625 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
626 self.follows_by_leader_id_project_id
627 .get(&(leader_id, project_id))
628 .map_or(&[], |v| v.as_slice())
629 }
630
631 /// Returns the most 'active' projects, defined as most people in the project
632 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
633 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
634 for participant in self.remote_participants.values() {
635 match participant.location {
636 ParticipantLocation::SharedProject { project_id } => {
637 project_hosts_and_guest_counts
638 .entry(project_id)
639 .or_default()
640 .1 += 1;
641 }
642 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
643 }
644 for project in &participant.projects {
645 project_hosts_and_guest_counts
646 .entry(project.id)
647 .or_default()
648 .0 = Some(participant.user.id);
649 }
650 }
651
652 if let Some(user) = self.user_store.read(cx).current_user() {
653 for project in &self.local_participant.projects {
654 project_hosts_and_guest_counts
655 .entry(project.id)
656 .or_default()
657 .0 = Some(user.id);
658 }
659 }
660
661 project_hosts_and_guest_counts
662 .into_iter()
663 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
664 .max_by_key(|(_, _, guest_count)| *guest_count)
665 .map(|(id, host, _)| (id, host))
666 }
667
668 async fn handle_room_updated(
669 this: Model<Self>,
670 envelope: TypedEnvelope<proto::RoomUpdated>,
671 _: Arc<Client>,
672 mut cx: AsyncAppContext,
673 ) -> Result<()> {
674 let room = envelope
675 .payload
676 .room
677 .ok_or_else(|| anyhow!("invalid room"))?;
678 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
679 }
680
681 fn apply_room_update(
682 &mut self,
683 mut room: proto::Room,
684 cx: &mut ModelContext<Self>,
685 ) -> Result<()> {
686 // Filter ourselves out from the room's participants.
687 let local_participant_ix = room
688 .participants
689 .iter()
690 .position(|participant| Some(participant.user_id) == self.client.user_id());
691 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
692
693 let pending_participant_user_ids = room
694 .pending_participants
695 .iter()
696 .map(|p| p.user_id)
697 .collect::<Vec<_>>();
698
699 let remote_participant_user_ids = room
700 .participants
701 .iter()
702 .map(|p| p.user_id)
703 .collect::<Vec<_>>();
704
705 let (remote_participants, pending_participants) =
706 self.user_store.update(cx, move |user_store, cx| {
707 (
708 user_store.get_users(remote_participant_user_ids, cx),
709 user_store.get_users(pending_participant_user_ids, cx),
710 )
711 });
712
713 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
714 let (remote_participants, pending_participants) =
715 futures::join!(remote_participants, pending_participants);
716
717 this.update(&mut cx, |this, cx| {
718 this.participant_user_ids.clear();
719
720 if let Some(participant) = local_participant {
721 this.local_participant.projects = participant.projects;
722 } else {
723 this.local_participant.projects.clear();
724 }
725
726 if let Some(participants) = remote_participants.log_err() {
727 for (participant, user) in room.participants.into_iter().zip(participants) {
728 let Some(peer_id) = participant.peer_id else {
729 continue;
730 };
731 let participant_index = ParticipantIndex(participant.participant_index);
732 this.participant_user_ids.insert(participant.user_id);
733
734 let old_projects = this
735 .remote_participants
736 .get(&participant.user_id)
737 .into_iter()
738 .flat_map(|existing| &existing.projects)
739 .map(|project| project.id)
740 .collect::<HashSet<_>>();
741 let new_projects = participant
742 .projects
743 .iter()
744 .map(|project| project.id)
745 .collect::<HashSet<_>>();
746
747 for project in &participant.projects {
748 if !old_projects.contains(&project.id) {
749 cx.emit(Event::RemoteProjectShared {
750 owner: user.clone(),
751 project_id: project.id,
752 worktree_root_names: project.worktree_root_names.clone(),
753 });
754 }
755 }
756
757 for unshared_project_id in old_projects.difference(&new_projects) {
758 this.joined_projects.retain(|project| {
759 if let Some(project) = project.upgrade() {
760 project.update(cx, |project, cx| {
761 if project.remote_id() == Some(*unshared_project_id) {
762 project.disconnected_from_host(cx);
763 false
764 } else {
765 true
766 }
767 })
768 } else {
769 false
770 }
771 });
772 cx.emit(Event::RemoteProjectUnshared {
773 project_id: *unshared_project_id,
774 });
775 }
776
777 let location = ParticipantLocation::from_proto(participant.location)
778 .unwrap_or(ParticipantLocation::External);
779 if let Some(remote_participant) =
780 this.remote_participants.get_mut(&participant.user_id)
781 {
782 remote_participant.peer_id = peer_id;
783 remote_participant.projects = participant.projects;
784 remote_participant.participant_index = participant_index;
785 if location != remote_participant.location {
786 remote_participant.location = location;
787 cx.emit(Event::ParticipantLocationChanged {
788 participant_id: peer_id,
789 });
790 }
791 } else {
792 this.remote_participants.insert(
793 participant.user_id,
794 RemoteParticipant {
795 user: user.clone(),
796 participant_index,
797 peer_id,
798 projects: participant.projects,
799 location,
800 muted: true,
801 speaking: false,
802 video_tracks: Default::default(),
803 audio_tracks: Default::default(),
804 },
805 );
806
807 Audio::play_sound(Sound::Joined, cx);
808
809 if let Some(live_kit) = this.live_kit.as_ref() {
810 let video_tracks =
811 live_kit.room.remote_video_tracks(&user.id.to_string());
812 let audio_tracks =
813 live_kit.room.remote_audio_tracks(&user.id.to_string());
814 let publications = live_kit
815 .room
816 .remote_audio_track_publications(&user.id.to_string());
817
818 for track in video_tracks {
819 this.remote_video_track_updated(
820 RemoteVideoTrackUpdate::Subscribed(track),
821 cx,
822 )
823 .log_err();
824 }
825
826 for (track, publication) in
827 audio_tracks.iter().zip(publications.iter())
828 {
829 this.remote_audio_track_updated(
830 RemoteAudioTrackUpdate::Subscribed(
831 track.clone(),
832 publication.clone(),
833 ),
834 cx,
835 )
836 .log_err();
837 }
838 }
839 }
840 }
841
842 this.remote_participants.retain(|user_id, participant| {
843 if this.participant_user_ids.contains(user_id) {
844 true
845 } else {
846 for project in &participant.projects {
847 cx.emit(Event::RemoteProjectUnshared {
848 project_id: project.id,
849 });
850 }
851 false
852 }
853 });
854 }
855
856 if let Some(pending_participants) = pending_participants.log_err() {
857 this.pending_participants = pending_participants;
858 for participant in &this.pending_participants {
859 this.participant_user_ids.insert(participant.id);
860 }
861 }
862
863 this.follows_by_leader_id_project_id.clear();
864 for follower in room.followers {
865 let project_id = follower.project_id;
866 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
867 (Some(leader), Some(follower)) => (leader, follower),
868
869 _ => {
870 log::error!("Follower message {follower:?} missing some state");
871 continue;
872 }
873 };
874
875 let list = this
876 .follows_by_leader_id_project_id
877 .entry((leader, project_id))
878 .or_insert(Vec::new());
879 if !list.contains(&follower) {
880 list.push(follower);
881 }
882 }
883
884 this.pending_room_update.take();
885 if this.should_leave() {
886 log::info!("room is empty, leaving");
887 let _ = this.leave(cx);
888 }
889
890 this.user_store.update(cx, |user_store, cx| {
891 let participant_indices_by_user_id = this
892 .remote_participants
893 .iter()
894 .map(|(user_id, participant)| (*user_id, participant.participant_index))
895 .collect();
896 user_store.set_participant_indices(participant_indices_by_user_id, cx);
897 });
898
899 this.check_invariants();
900 this.room_update_completed_tx.try_send(Some(())).ok();
901 cx.notify();
902 })
903 .ok();
904 }));
905
906 cx.notify();
907 Ok(())
908 }
909
910 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
911 let mut done_rx = self.room_update_completed_rx.clone();
912 async move {
913 while let Some(result) = done_rx.next().await {
914 if result.is_some() {
915 break;
916 }
917 }
918 }
919 }
920
921 fn remote_video_track_updated(
922 &mut self,
923 change: RemoteVideoTrackUpdate,
924 cx: &mut ModelContext<Self>,
925 ) -> Result<()> {
926 match change {
927 RemoteVideoTrackUpdate::Subscribed(track) => {
928 let user_id = track.publisher_id().parse()?;
929 let track_id = track.sid().to_string();
930 let participant = self
931 .remote_participants
932 .get_mut(&user_id)
933 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
934 participant.video_tracks.insert(track_id.clone(), track);
935 cx.emit(Event::RemoteVideoTracksChanged {
936 participant_id: participant.peer_id,
937 });
938 }
939 RemoteVideoTrackUpdate::Unsubscribed {
940 publisher_id,
941 track_id,
942 } => {
943 let user_id = publisher_id.parse()?;
944 let participant = self
945 .remote_participants
946 .get_mut(&user_id)
947 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
948 participant.video_tracks.remove(&track_id);
949 cx.emit(Event::RemoteVideoTracksChanged {
950 participant_id: participant.peer_id,
951 });
952 }
953 }
954
955 cx.notify();
956 Ok(())
957 }
958
959 fn remote_audio_track_updated(
960 &mut self,
961 change: RemoteAudioTrackUpdate,
962 cx: &mut ModelContext<Self>,
963 ) -> Result<()> {
964 match change {
965 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
966 let mut speaker_ids = speakers
967 .into_iter()
968 .filter_map(|speaker_sid| speaker_sid.parse().ok())
969 .collect::<Vec<u64>>();
970 speaker_ids.sort_unstable();
971 for (sid, participant) in &mut self.remote_participants {
972 if let Ok(_) = speaker_ids.binary_search(sid) {
973 participant.speaking = true;
974 } else {
975 participant.speaking = false;
976 }
977 }
978 if let Some(id) = self.client.user_id() {
979 if let Some(room) = &mut self.live_kit {
980 if let Ok(_) = speaker_ids.binary_search(&id) {
981 room.speaking = true;
982 } else {
983 room.speaking = false;
984 }
985 }
986 }
987 cx.notify();
988 }
989 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
990 let mut found = false;
991 for participant in &mut self.remote_participants.values_mut() {
992 for track in participant.audio_tracks.values() {
993 if track.sid() == track_id {
994 found = true;
995 break;
996 }
997 }
998 if found {
999 participant.muted = muted;
1000 break;
1001 }
1002 }
1003
1004 cx.notify();
1005 }
1006 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1007 let user_id = track.publisher_id().parse()?;
1008 let track_id = track.sid().to_string();
1009 let participant = self
1010 .remote_participants
1011 .get_mut(&user_id)
1012 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1013 participant.audio_tracks.insert(track_id.clone(), track);
1014 participant.muted = publication.is_muted();
1015
1016 cx.emit(Event::RemoteAudioTracksChanged {
1017 participant_id: participant.peer_id,
1018 });
1019 }
1020 RemoteAudioTrackUpdate::Unsubscribed {
1021 publisher_id,
1022 track_id,
1023 } => {
1024 let user_id = publisher_id.parse()?;
1025 let participant = self
1026 .remote_participants
1027 .get_mut(&user_id)
1028 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1029 participant.audio_tracks.remove(&track_id);
1030 cx.emit(Event::RemoteAudioTracksChanged {
1031 participant_id: participant.peer_id,
1032 });
1033 }
1034 }
1035
1036 cx.notify();
1037 Ok(())
1038 }
1039
1040 fn check_invariants(&self) {
1041 #[cfg(any(test, feature = "test-support"))]
1042 {
1043 for participant in self.remote_participants.values() {
1044 assert!(self.participant_user_ids.contains(&participant.user.id));
1045 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1046 }
1047
1048 for participant in &self.pending_participants {
1049 assert!(self.participant_user_ids.contains(&participant.id));
1050 assert_ne!(participant.id, self.client.user_id().unwrap());
1051 }
1052
1053 assert_eq!(
1054 self.participant_user_ids.len(),
1055 self.remote_participants.len() + self.pending_participants.len()
1056 );
1057 }
1058 }
1059
1060 pub(crate) fn call(
1061 &mut self,
1062 called_user_id: u64,
1063 initial_project_id: Option<u64>,
1064 cx: &mut ModelContext<Self>,
1065 ) -> Task<Result<()>> {
1066 if self.status.is_offline() {
1067 return Task::ready(Err(anyhow!("room is offline")));
1068 }
1069
1070 cx.notify();
1071 let client = self.client.clone();
1072 let room_id = self.id;
1073 self.pending_call_count += 1;
1074 cx.spawn(move |this, mut cx| async move {
1075 let result = client
1076 .request(proto::Call {
1077 room_id,
1078 called_user_id,
1079 initial_project_id,
1080 })
1081 .await;
1082 this.update(&mut cx, |this, cx| {
1083 this.pending_call_count -= 1;
1084 if this.should_leave() {
1085 this.leave(cx).detach_and_log_err(cx);
1086 }
1087 })?;
1088 result?;
1089 Ok(())
1090 })
1091 }
1092
1093 pub fn join_project(
1094 &mut self,
1095 id: u64,
1096 language_registry: Arc<LanguageRegistry>,
1097 fs: Arc<dyn Fs>,
1098 cx: &mut ModelContext<Self>,
1099 ) -> Task<Result<Model<Project>>> {
1100 let client = self.client.clone();
1101 let user_store = self.user_store.clone();
1102 cx.emit(Event::RemoteProjectJoined { project_id: id });
1103 cx.spawn(move |this, mut cx| async move {
1104 let project =
1105 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1106
1107 this.update(&mut cx, |this, cx| {
1108 this.joined_projects.retain(|project| {
1109 if let Some(project) = project.upgrade() {
1110 !project.read(cx).is_read_only()
1111 } else {
1112 false
1113 }
1114 });
1115 this.joined_projects.insert(project.downgrade());
1116 })?;
1117 Ok(project)
1118 })
1119 }
1120
1121 pub(crate) fn share_project(
1122 &mut self,
1123 project: Model<Project>,
1124 cx: &mut ModelContext<Self>,
1125 ) -> Task<Result<u64>> {
1126 if let Some(project_id) = project.read(cx).remote_id() {
1127 return Task::ready(Ok(project_id));
1128 }
1129
1130 let request = self.client.request(proto::ShareProject {
1131 room_id: self.id(),
1132 worktrees: project.read(cx).worktree_metadata_protos(cx),
1133 });
1134 cx.spawn(|this, mut cx| async move {
1135 let response = request.await?;
1136
1137 project.update(&mut cx, |project, cx| {
1138 project.shared(response.project_id, cx)
1139 })??;
1140
1141 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1142 this.update(&mut cx, |this, cx| {
1143 this.shared_projects.insert(project.downgrade());
1144 let active_project = this.local_participant.active_project.as_ref();
1145 if active_project.map_or(false, |location| *location == project) {
1146 this.set_location(Some(&project), cx)
1147 } else {
1148 Task::ready(Ok(()))
1149 }
1150 })?
1151 .await?;
1152
1153 Ok(response.project_id)
1154 })
1155 }
1156
1157 pub(crate) fn unshare_project(
1158 &mut self,
1159 project: Model<Project>,
1160 cx: &mut ModelContext<Self>,
1161 ) -> Result<()> {
1162 let project_id = match project.read(cx).remote_id() {
1163 Some(project_id) => project_id,
1164 None => return Ok(()),
1165 };
1166
1167 self.client.send(proto::UnshareProject { project_id })?;
1168 project.update(cx, |this, cx| this.unshare(cx))
1169 }
1170
1171 pub(crate) fn set_location(
1172 &mut self,
1173 project: Option<&Model<Project>>,
1174 cx: &mut ModelContext<Self>,
1175 ) -> Task<Result<()>> {
1176 if self.status.is_offline() {
1177 return Task::ready(Err(anyhow!("room is offline")));
1178 }
1179
1180 let client = self.client.clone();
1181 let room_id = self.id;
1182 let location = if let Some(project) = project {
1183 self.local_participant.active_project = Some(project.downgrade());
1184 if let Some(project_id) = project.read(cx).remote_id() {
1185 proto::participant_location::Variant::SharedProject(
1186 proto::participant_location::SharedProject { id: project_id },
1187 )
1188 } else {
1189 proto::participant_location::Variant::UnsharedProject(
1190 proto::participant_location::UnsharedProject {},
1191 )
1192 }
1193 } else {
1194 self.local_participant.active_project = None;
1195 proto::participant_location::Variant::External(proto::participant_location::External {})
1196 };
1197
1198 cx.notify();
1199 cx.background_executor().spawn(async move {
1200 client
1201 .request(proto::UpdateParticipantLocation {
1202 room_id,
1203 location: Some(proto::ParticipantLocation {
1204 variant: Some(location),
1205 }),
1206 })
1207 .await?;
1208 Ok(())
1209 })
1210 }
1211
1212 pub fn is_screen_sharing(&self) -> bool {
1213 self.live_kit.as_ref().map_or(false, |live_kit| {
1214 !matches!(live_kit.screen_track, LocalTrack::None)
1215 })
1216 }
1217
1218 pub fn is_sharing_mic(&self) -> bool {
1219 self.live_kit.as_ref().map_or(false, |live_kit| {
1220 !matches!(live_kit.microphone_track, LocalTrack::None)
1221 })
1222 }
1223
1224 pub fn is_muted(&self, cx: &AppContext) -> bool {
1225 self.live_kit
1226 .as_ref()
1227 .and_then(|live_kit| match &live_kit.microphone_track {
1228 LocalTrack::None => Some(Self::mute_on_join(cx)),
1229 LocalTrack::Pending { muted, .. } => Some(*muted),
1230 LocalTrack::Published { muted, .. } => Some(*muted),
1231 })
1232 .unwrap_or(false)
1233 }
1234
1235 pub fn is_speaking(&self) -> bool {
1236 self.live_kit
1237 .as_ref()
1238 .map_or(false, |live_kit| live_kit.speaking)
1239 }
1240
1241 pub fn is_deafened(&self) -> Option<bool> {
1242 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1243 }
1244
1245 #[track_caller]
1246 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1247 if self.status.is_offline() {
1248 return Task::ready(Err(anyhow!("room is offline")));
1249 } else if self.is_sharing_mic() {
1250 return Task::ready(Err(anyhow!("microphone was already shared")));
1251 }
1252
1253 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1254 let publish_id = post_inc(&mut live_kit.next_publish_id);
1255 live_kit.microphone_track = LocalTrack::Pending {
1256 publish_id,
1257 muted: false,
1258 };
1259 cx.notify();
1260 publish_id
1261 } else {
1262 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1263 };
1264
1265 cx.spawn(move |this, mut cx| async move {
1266 let publish_track = async {
1267 let track = LocalAudioTrack::create();
1268 this.upgrade()
1269 .ok_or_else(|| anyhow!("room was dropped"))?
1270 .update(&mut cx, |this, _| {
1271 this.live_kit
1272 .as_ref()
1273 .map(|live_kit| live_kit.room.publish_audio_track(track))
1274 })?
1275 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1276 .await
1277 };
1278
1279 let publication = publish_track.await;
1280 this.upgrade()
1281 .ok_or_else(|| anyhow!("room was dropped"))?
1282 .update(&mut cx, |this, cx| {
1283 let live_kit = this
1284 .live_kit
1285 .as_mut()
1286 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1287
1288 let (canceled, muted) = if let LocalTrack::Pending {
1289 publish_id: cur_publish_id,
1290 muted,
1291 } = &live_kit.microphone_track
1292 {
1293 (*cur_publish_id != publish_id, *muted)
1294 } else {
1295 (true, false)
1296 };
1297
1298 match publication {
1299 Ok(publication) => {
1300 if canceled {
1301 live_kit.room.unpublish_track(publication);
1302 } else {
1303 if muted {
1304 cx.background_executor()
1305 .spawn(publication.set_mute(muted))
1306 .detach();
1307 }
1308 live_kit.microphone_track = LocalTrack::Published {
1309 track_publication: publication,
1310 muted,
1311 };
1312 cx.notify();
1313 }
1314 Ok(())
1315 }
1316 Err(error) => {
1317 if canceled {
1318 Ok(())
1319 } else {
1320 live_kit.microphone_track = LocalTrack::None;
1321 cx.notify();
1322 Err(error)
1323 }
1324 }
1325 }
1326 })?
1327 })
1328 }
1329
1330 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1331 if self.status.is_offline() {
1332 return Task::ready(Err(anyhow!("room is offline")));
1333 } else if self.is_screen_sharing() {
1334 return Task::ready(Err(anyhow!("screen was already shared")));
1335 }
1336
1337 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1338 let publish_id = post_inc(&mut live_kit.next_publish_id);
1339 live_kit.screen_track = LocalTrack::Pending {
1340 publish_id,
1341 muted: false,
1342 };
1343 cx.notify();
1344 (live_kit.room.display_sources(), publish_id)
1345 } else {
1346 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1347 };
1348
1349 cx.spawn(move |this, mut cx| async move {
1350 let publish_track = async {
1351 let displays = displays.await?;
1352 let display = displays
1353 .first()
1354 .ok_or_else(|| anyhow!("no display found"))?;
1355 let track = LocalVideoTrack::screen_share_for_display(&display);
1356 this.upgrade()
1357 .ok_or_else(|| anyhow!("room was dropped"))?
1358 .update(&mut cx, |this, _| {
1359 this.live_kit
1360 .as_ref()
1361 .map(|live_kit| live_kit.room.publish_video_track(track))
1362 })?
1363 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1364 .await
1365 };
1366
1367 let publication = publish_track.await;
1368 this.upgrade()
1369 .ok_or_else(|| anyhow!("room was dropped"))?
1370 .update(&mut cx, |this, cx| {
1371 let live_kit = this
1372 .live_kit
1373 .as_mut()
1374 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1375
1376 let (canceled, muted) = if let LocalTrack::Pending {
1377 publish_id: cur_publish_id,
1378 muted,
1379 } = &live_kit.screen_track
1380 {
1381 (*cur_publish_id != publish_id, *muted)
1382 } else {
1383 (true, false)
1384 };
1385
1386 match publication {
1387 Ok(publication) => {
1388 if canceled {
1389 live_kit.room.unpublish_track(publication);
1390 } else {
1391 if muted {
1392 cx.background_executor()
1393 .spawn(publication.set_mute(muted))
1394 .detach();
1395 }
1396 live_kit.screen_track = LocalTrack::Published {
1397 track_publication: publication,
1398 muted,
1399 };
1400 cx.notify();
1401 }
1402
1403 Audio::play_sound(Sound::StartScreenshare, cx);
1404
1405 Ok(())
1406 }
1407 Err(error) => {
1408 if canceled {
1409 Ok(())
1410 } else {
1411 live_kit.screen_track = LocalTrack::None;
1412 cx.notify();
1413 Err(error)
1414 }
1415 }
1416 }
1417 })?
1418 })
1419 }
1420
1421 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1422 let should_mute = !self.is_muted(cx);
1423 if let Some(live_kit) = self.live_kit.as_mut() {
1424 if matches!(live_kit.microphone_track, LocalTrack::None) {
1425 return Ok(self.share_microphone(cx));
1426 }
1427
1428 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1429 live_kit.muted_by_user = should_mute;
1430
1431 if old_muted == true && live_kit.deafened == true {
1432 if let Some(task) = self.toggle_deafen(cx).ok() {
1433 task.detach();
1434 }
1435 }
1436
1437 Ok(ret_task)
1438 } else {
1439 Err(anyhow!("LiveKit not started"))
1440 }
1441 }
1442
1443 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1444 if let Some(live_kit) = self.live_kit.as_mut() {
1445 (*live_kit).deafened = !live_kit.deafened;
1446
1447 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1448 // Context notification is sent within set_mute itself.
1449 let mut mute_task = None;
1450 // When deafening, mute user's mic as well.
1451 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1452 if live_kit.deafened || !live_kit.muted_by_user {
1453 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1454 };
1455 for participant in self.remote_participants.values() {
1456 for track in live_kit
1457 .room
1458 .remote_audio_track_publications(&participant.user.id.to_string())
1459 {
1460 let deafened = live_kit.deafened;
1461 tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1462 }
1463 }
1464
1465 Ok(cx.foreground_executor().spawn(async move {
1466 if let Some(mute_task) = mute_task {
1467 mute_task.await?;
1468 }
1469 for task in tasks {
1470 task.await?;
1471 }
1472 Ok(())
1473 }))
1474 } else {
1475 Err(anyhow!("LiveKit not started"))
1476 }
1477 }
1478
1479 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1480 if self.status.is_offline() {
1481 return Err(anyhow!("room is offline"));
1482 }
1483
1484 let live_kit = self
1485 .live_kit
1486 .as_mut()
1487 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1488 match mem::take(&mut live_kit.screen_track) {
1489 LocalTrack::None => Err(anyhow!("screen was not shared")),
1490 LocalTrack::Pending { .. } => {
1491 cx.notify();
1492 Ok(())
1493 }
1494 LocalTrack::Published {
1495 track_publication, ..
1496 } => {
1497 live_kit.room.unpublish_track(track_publication);
1498 cx.notify();
1499
1500 Audio::play_sound(Sound::StopScreenshare, cx);
1501 Ok(())
1502 }
1503 }
1504 }
1505
1506 #[cfg(any(test, feature = "test-support"))]
1507 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1508 self.live_kit
1509 .as_ref()
1510 .unwrap()
1511 .room
1512 .set_display_sources(sources);
1513 }
1514}
1515
1516struct LiveKitRoom {
1517 room: Arc<live_kit_client::Room>,
1518 screen_track: LocalTrack,
1519 microphone_track: LocalTrack,
1520 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1521 muted_by_user: bool,
1522 deafened: bool,
1523 speaking: bool,
1524 next_publish_id: usize,
1525 _maintain_room: Task<()>,
1526 _maintain_tracks: [Task<()>; 2],
1527}
1528
1529impl LiveKitRoom {
1530 fn set_mute(
1531 self: &mut LiveKitRoom,
1532 should_mute: bool,
1533 cx: &mut ModelContext<Room>,
1534 ) -> Result<(Task<Result<()>>, bool)> {
1535 if !should_mute {
1536 // clear user muting state.
1537 self.muted_by_user = false;
1538 }
1539
1540 let (result, old_muted) = match &mut self.microphone_track {
1541 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1542 LocalTrack::Pending { muted, .. } => {
1543 let old_muted = *muted;
1544 *muted = should_mute;
1545 cx.notify();
1546 Ok((Task::Ready(Some(Ok(()))), old_muted))
1547 }
1548 LocalTrack::Published {
1549 track_publication,
1550 muted,
1551 } => {
1552 let old_muted = *muted;
1553 *muted = should_mute;
1554 cx.notify();
1555 Ok((
1556 cx.background_executor()
1557 .spawn(track_publication.set_mute(*muted)),
1558 old_muted,
1559 ))
1560 }
1561 }?;
1562
1563 if old_muted != should_mute {
1564 if should_mute {
1565 Audio::play_sound(Sound::Mute, cx);
1566 } else {
1567 Audio::play_sound(Sound::Unmute, cx);
1568 }
1569 }
1570
1571 Ok((result, old_muted))
1572 }
1573}
1574
1575enum LocalTrack {
1576 None,
1577 Pending {
1578 publish_id: usize,
1579 muted: bool,
1580 },
1581 Published {
1582 track_publication: LocalTrackPublication,
1583 muted: bool,
1584 },
1585}
1586
1587impl Default for LocalTrack {
1588 fn default() -> Self {
1589 Self::None
1590 }
1591}
1592
1593#[derive(Copy, Clone, PartialEq, Eq)]
1594pub enum RoomStatus {
1595 Online,
1596 Rejoining,
1597 Offline,
1598}
1599
1600impl RoomStatus {
1601 pub fn is_offline(&self) -> bool {
1602 matches!(self, RoomStatus::Offline)
1603 }
1604
1605 pub fn is_online(&self) -> bool {
1606 matches!(self, RoomStatus::Online)
1607 }
1608}