1#![allow(dead_code, unused)]
2// todo!()
3
4use crate::{
5 call_settings::CallSettings,
6 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
7 IncomingCall,
8};
9use anyhow::{anyhow, Result};
10use audio2::{Audio, Sound};
11use client2::{
12 proto::{self, PeerId},
13 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
14};
15use collections::{BTreeMap, HashMap, HashSet};
16use fs2::Fs;
17use futures::{FutureExt, StreamExt};
18use gpui2::{
19 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
20};
21use language2::LanguageRegistry;
22use live_kit_client::{LocalTrackPublication, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate};
23use postage::{sink::Sink, stream::Stream, watch};
24use project2::Project;
25use settings2::Settings;
26use std::{future::Future, sync::Arc, time::Duration};
27use util::{ResultExt, TryFutureExt};
28
29pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
30
31#[derive(Clone, Debug, PartialEq, Eq)]
32pub enum Event {
33 ParticipantLocationChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteVideoTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteAudioTracksChanged {
40 participant_id: proto::PeerId,
41 },
42 RemoteProjectShared {
43 owner: Arc<User>,
44 project_id: u64,
45 worktree_root_names: Vec<String>,
46 },
47 RemoteProjectUnshared {
48 project_id: u64,
49 },
50 RemoteProjectJoined {
51 project_id: u64,
52 },
53 RemoteProjectInvitationDiscarded {
54 project_id: u64,
55 },
56 Left,
57}
58
59pub struct Room {
60 id: u64,
61 channel_id: Option<u64>,
62 // live_kit: Option<LiveKitRoom>,
63 status: RoomStatus,
64 shared_projects: HashSet<WeakModel<Project>>,
65 joined_projects: HashSet<WeakModel<Project>>,
66 local_participant: LocalParticipant,
67 remote_participants: BTreeMap<u64, RemoteParticipant>,
68 pending_participants: Vec<Arc<User>>,
69 participant_user_ids: HashSet<u64>,
70 pending_call_count: usize,
71 leave_when_empty: bool,
72 client: Arc<Client>,
73 user_store: Model<UserStore>,
74 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
75 client_subscriptions: Vec<client2::Subscription>,
76 _subscriptions: Vec<gpui2::Subscription>,
77 room_update_completed_tx: watch::Sender<Option<()>>,
78 room_update_completed_rx: watch::Receiver<Option<()>>,
79 pending_room_update: Option<Task<()>>,
80 maintain_connection: Option<Task<Option<()>>>,
81}
82
83impl EventEmitter for Room {
84 type Event = Event;
85}
86
87impl Room {
88 pub fn channel_id(&self) -> Option<u64> {
89 self.channel_id
90 }
91
92 pub fn is_sharing_project(&self) -> bool {
93 !self.shared_projects.is_empty()
94 }
95
96 #[cfg(any(test, feature = "test-support"))]
97 pub fn is_connected(&self) -> bool {
98 false
99 // if let Some(live_kit) = self.live_kit.as_ref() {
100 // matches!(
101 // *live_kit.room.status().borrow(),
102 // live_kit_client::ConnectionState::Connected { .. }
103 // )
104 // } else {
105 // false
106 // }
107 }
108
109 fn new(
110 id: u64,
111 channel_id: Option<u64>,
112 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
113 client: Arc<Client>,
114 user_store: Model<UserStore>,
115 cx: &mut ModelContext<Self>,
116 ) -> Self {
117 todo!()
118 // let _live_kit_room = if let Some(connection_info) = live_kit_connection_info {
119 // let room = live_kit_client::Room::new();
120 // let mut status = room.status();
121 // // Consume the initial status of the room.
122 // let _ = status.try_recv();
123 // let _maintain_room = cx.spawn(|this, mut cx| async move {
124 // while let Some(status) = status.next().await {
125 // let this = if let Some(this) = this.upgrade() {
126 // this
127 // } else {
128 // break;
129 // };
130
131 // if status == live_kit_client::ConnectionState::Disconnected {
132 // this.update(&mut cx, |this, cx| this.leave(cx).log_err())
133 // .ok();
134 // break;
135 // }
136 // }
137 // });
138
139 // let mut track_video_changes = room.remote_video_track_updates();
140 // let _maintain_video_tracks = cx.spawn(|this, mut cx| async move {
141 // while let Some(track_change) = track_video_changes.next().await {
142 // let this = if let Some(this) = this.upgrade() {
143 // this
144 // } else {
145 // break;
146 // };
147
148 // this.update(&mut cx, |this, cx| {
149 // this.remote_video_track_updated(track_change, cx).log_err()
150 // })
151 // .ok();
152 // }
153 // });
154
155 // let mut track_audio_changes = room.remote_audio_track_updates();
156 // let _maintain_audio_tracks = cx.spawn(|this, mut cx| async move {
157 // while let Some(track_change) = track_audio_changes.next().await {
158 // let this = if let Some(this) = this.upgrade() {
159 // this
160 // } else {
161 // break;
162 // };
163
164 // this.update(&mut cx, |this, cx| {
165 // this.remote_audio_track_updated(track_change, cx).log_err()
166 // })
167 // .ok();
168 // }
169 // });
170
171 // let connect = room.connect(&connection_info.server_url, &connection_info.token);
172 // cx.spawn(|this, mut cx| async move {
173 // connect.await?;
174
175 // if !cx.update(|cx| Self::mute_on_join(cx))? {
176 // this.update(&mut cx, |this, cx| this.share_microphone(cx))?
177 // .await?;
178 // }
179
180 // anyhow::Ok(())
181 // })
182 // .detach_and_log_err(cx);
183
184 // Some(LiveKitRoom {
185 // room,
186 // screen_track: LocalTrack::None,
187 // microphone_track: LocalTrack::None,
188 // next_publish_id: 0,
189 // muted_by_user: false,
190 // deafened: false,
191 // speaking: false,
192 // _maintain_room,
193 // _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
194 // })
195 // } else {
196 // None
197 // };
198
199 // let maintain_connection = cx.spawn({
200 // let client = client.clone();
201 // move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
202 // });
203
204 // Audio::play_sound(Sound::Joined, cx);
205
206 // let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
207
208 // Self {
209 // id,
210 // channel_id,
211 // // live_kit: live_kit_room,
212 // status: RoomStatus::Online,
213 // shared_projects: Default::default(),
214 // joined_projects: Default::default(),
215 // participant_user_ids: Default::default(),
216 // local_participant: Default::default(),
217 // remote_participants: Default::default(),
218 // pending_participants: Default::default(),
219 // pending_call_count: 0,
220 // client_subscriptions: vec![
221 // client.add_message_handler(cx.weak_handle(), Self::handle_room_updated)
222 // ],
223 // _subscriptions: vec![
224 // cx.on_release(Self::released),
225 // cx.on_app_quit(Self::app_will_quit),
226 // ],
227 // leave_when_empty: false,
228 // pending_room_update: None,
229 // client,
230 // user_store,
231 // follows_by_leader_id_project_id: Default::default(),
232 // maintain_connection: Some(maintain_connection),
233 // room_update_completed_tx,
234 // room_update_completed_rx,
235 // }
236 }
237
238 pub(crate) fn create(
239 called_user_id: u64,
240 initial_project: Option<Model<Project>>,
241 client: Arc<Client>,
242 user_store: Model<UserStore>,
243 cx: &mut AppContext,
244 ) -> Task<Result<Model<Self>>> {
245 cx.spawn(move |mut cx| async move {
246 let response = client.request(proto::CreateRoom {}).await?;
247 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
248 let room = cx.build_model(|cx| {
249 Self::new(
250 room_proto.id,
251 None,
252 response.live_kit_connection_info,
253 client,
254 user_store,
255 cx,
256 )
257 })?;
258
259 let initial_project_id = if let Some(initial_project) = initial_project {
260 let initial_project_id = room
261 .update(&mut cx, |room, cx| {
262 room.share_project(initial_project.clone(), cx)
263 })?
264 .await?;
265 Some(initial_project_id)
266 } else {
267 None
268 };
269
270 match room
271 .update(&mut cx, |room, cx| {
272 room.leave_when_empty = true;
273 room.call(called_user_id, initial_project_id, cx)
274 })?
275 .await
276 {
277 Ok(()) => Ok(room),
278 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
279 }
280 })
281 }
282
283 pub(crate) fn join_channel(
284 channel_id: u64,
285 client: Arc<Client>,
286 user_store: Model<UserStore>,
287 cx: &mut AppContext,
288 ) -> Task<Result<Model<Self>>> {
289 cx.spawn(move |cx| async move {
290 Self::from_join_response(
291 client.request(proto::JoinChannel { channel_id }).await?,
292 client,
293 user_store,
294 cx,
295 )
296 })
297 }
298
299 pub(crate) fn join(
300 call: &IncomingCall,
301 client: Arc<Client>,
302 user_store: Model<UserStore>,
303 cx: &mut AppContext,
304 ) -> Task<Result<Model<Self>>> {
305 let id = call.room_id;
306 cx.spawn(move |cx| async move {
307 Self::from_join_response(
308 client.request(proto::JoinRoom { id }).await?,
309 client,
310 user_store,
311 cx,
312 )
313 })
314 }
315
316 fn released(&mut self, cx: &mut AppContext) {
317 if self.status.is_online() {
318 self.leave_internal(cx).detach_and_log_err(cx);
319 }
320 }
321
322 fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
323 let task = if self.status.is_online() {
324 let leave = self.leave_internal(cx);
325 Some(cx.executor().spawn(async move {
326 leave.await.log_err();
327 }))
328 } else {
329 None
330 };
331
332 async move {
333 if let Some(task) = task {
334 task.await;
335 }
336 }
337 }
338
339 pub fn mute_on_join(cx: &AppContext) -> bool {
340 CallSettings::get_global(cx).mute_on_join || client2::IMPERSONATE_LOGIN.is_some()
341 }
342
343 fn from_join_response(
344 response: proto::JoinRoomResponse,
345 client: Arc<Client>,
346 user_store: Model<UserStore>,
347 mut cx: AsyncAppContext,
348 ) -> Result<Model<Self>> {
349 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
350 let room = cx.build_model(|cx| {
351 Self::new(
352 room_proto.id,
353 response.channel_id,
354 response.live_kit_connection_info,
355 client,
356 user_store,
357 cx,
358 )
359 })?;
360 room.update(&mut cx, |room, cx| {
361 room.leave_when_empty = room.channel_id.is_none();
362 room.apply_room_update(room_proto, cx)?;
363 anyhow::Ok(())
364 })??;
365 Ok(room)
366 }
367
368 fn should_leave(&self) -> bool {
369 self.leave_when_empty
370 && self.pending_room_update.is_none()
371 && self.pending_participants.is_empty()
372 && self.remote_participants.is_empty()
373 && self.pending_call_count == 0
374 }
375
376 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
377 cx.notify();
378 cx.emit(Event::Left);
379 self.leave_internal(cx)
380 }
381
382 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
383 if self.status.is_offline() {
384 return Task::ready(Err(anyhow!("room is offline")));
385 }
386
387 log::info!("leaving room");
388 Audio::play_sound(Sound::Leave, cx);
389
390 self.clear_state(cx);
391
392 let leave_room = self.client.request(proto::LeaveRoom {});
393 cx.executor().spawn(async move {
394 leave_room.await?;
395 anyhow::Ok(())
396 })
397 }
398
399 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
400 for project in self.shared_projects.drain() {
401 if let Some(project) = project.upgrade() {
402 project.update(cx, |project, cx| {
403 project.unshare(cx).log_err();
404 });
405 }
406 }
407 for project in self.joined_projects.drain() {
408 if let Some(project) = project.upgrade() {
409 project.update(cx, |project, cx| {
410 project.disconnected_from_host(cx);
411 project.close(cx);
412 });
413 }
414 }
415
416 self.status = RoomStatus::Offline;
417 self.remote_participants.clear();
418 self.pending_participants.clear();
419 self.participant_user_ids.clear();
420 self.client_subscriptions.clear();
421 // self.live_kit.take();
422 self.pending_room_update.take();
423 self.maintain_connection.take();
424 }
425
426 async fn maintain_connection(
427 this: WeakModel<Self>,
428 client: Arc<Client>,
429 mut cx: AsyncAppContext,
430 ) -> Result<()> {
431 let mut client_status = client.status();
432 loop {
433 let _ = client_status.try_recv();
434 let is_connected = client_status.borrow().is_connected();
435 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
436 if !is_connected || client_status.next().await.is_some() {
437 log::info!("detected client disconnection");
438
439 this.upgrade()
440 .ok_or_else(|| anyhow!("room was dropped"))?
441 .update(&mut cx, |this, cx| {
442 this.status = RoomStatus::Rejoining;
443 cx.notify();
444 })?;
445
446 // Wait for client to re-establish a connection to the server.
447 {
448 let mut reconnection_timeout = cx.executor().timer(RECONNECT_TIMEOUT).fuse();
449 let client_reconnection = async {
450 let mut remaining_attempts = 3;
451 while remaining_attempts > 0 {
452 if client_status.borrow().is_connected() {
453 log::info!("client reconnected, attempting to rejoin room");
454
455 let Some(this) = this.upgrade() else { break };
456 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
457 Ok(task) => {
458 if task.await.log_err().is_some() {
459 return true;
460 } else {
461 remaining_attempts -= 1;
462 }
463 }
464 Err(_app_dropped) => return false,
465 }
466 } else if client_status.borrow().is_signed_out() {
467 return false;
468 }
469
470 log::info!(
471 "waiting for client status change, remaining attempts {}",
472 remaining_attempts
473 );
474 client_status.next().await;
475 }
476 false
477 }
478 .fuse();
479 futures::pin_mut!(client_reconnection);
480
481 futures::select_biased! {
482 reconnected = client_reconnection => {
483 if reconnected {
484 log::info!("successfully reconnected to room");
485 // If we successfully joined the room, go back around the loop
486 // waiting for future connection status changes.
487 continue;
488 }
489 }
490 _ = reconnection_timeout => {
491 log::info!("room reconnection timeout expired");
492 }
493 }
494 }
495
496 break;
497 }
498 }
499
500 // The client failed to re-establish a connection to the server
501 // or an error occurred while trying to re-join the room. Either way
502 // we leave the room and return an error.
503 if let Some(this) = this.upgrade() {
504 log::info!("reconnection failed, leaving room");
505 let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
506 }
507 Err(anyhow!(
508 "can't reconnect to room: client failed to re-establish connection"
509 ))
510 }
511
512 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
513 let mut projects = HashMap::default();
514 let mut reshared_projects = Vec::new();
515 let mut rejoined_projects = Vec::new();
516 self.shared_projects.retain(|project| {
517 if let Some(handle) = project.upgrade() {
518 let project = handle.read(cx);
519 if let Some(project_id) = project.remote_id() {
520 projects.insert(project_id, handle.clone());
521 reshared_projects.push(proto::UpdateProject {
522 project_id,
523 worktrees: project.worktree_metadata_protos(cx),
524 });
525 return true;
526 }
527 }
528 false
529 });
530 self.joined_projects.retain(|project| {
531 if let Some(handle) = project.upgrade() {
532 let project = handle.read(cx);
533 if let Some(project_id) = project.remote_id() {
534 projects.insert(project_id, handle.clone());
535 rejoined_projects.push(proto::RejoinProject {
536 id: project_id,
537 worktrees: project
538 .worktrees()
539 .map(|worktree| {
540 let worktree = worktree.read(cx);
541 proto::RejoinWorktree {
542 id: worktree.id().to_proto(),
543 scan_id: worktree.completed_scan_id() as u64,
544 }
545 })
546 .collect(),
547 });
548 }
549 return true;
550 }
551 false
552 });
553
554 let response = self.client.request_envelope(proto::RejoinRoom {
555 id: self.id,
556 reshared_projects,
557 rejoined_projects,
558 });
559
560 cx.spawn(|this, mut cx| async move {
561 let response = response.await?;
562 let message_id = response.message_id;
563 let response = response.payload;
564 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
565 this.update(&mut cx, |this, cx| {
566 this.status = RoomStatus::Online;
567 this.apply_room_update(room_proto, cx)?;
568
569 for reshared_project in response.reshared_projects {
570 if let Some(project) = projects.get(&reshared_project.id) {
571 project.update(cx, |project, cx| {
572 project.reshared(reshared_project, cx).log_err();
573 });
574 }
575 }
576
577 for rejoined_project in response.rejoined_projects {
578 if let Some(project) = projects.get(&rejoined_project.id) {
579 project.update(cx, |project, cx| {
580 project.rejoined(rejoined_project, message_id, cx).log_err();
581 });
582 }
583 }
584
585 anyhow::Ok(())
586 })?
587 })
588 }
589
590 pub fn id(&self) -> u64 {
591 self.id
592 }
593
594 pub fn status(&self) -> RoomStatus {
595 self.status
596 }
597
598 pub fn local_participant(&self) -> &LocalParticipant {
599 &self.local_participant
600 }
601
602 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
603 &self.remote_participants
604 }
605
606 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
607 self.remote_participants
608 .values()
609 .find(|p| p.peer_id == peer_id)
610 }
611
612 pub fn pending_participants(&self) -> &[Arc<User>] {
613 &self.pending_participants
614 }
615
616 pub fn contains_participant(&self, user_id: u64) -> bool {
617 self.participant_user_ids.contains(&user_id)
618 }
619
620 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
621 self.follows_by_leader_id_project_id
622 .get(&(leader_id, project_id))
623 .map_or(&[], |v| v.as_slice())
624 }
625
626 /// Returns the most 'active' projects, defined as most people in the project
627 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
628 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
629 for participant in self.remote_participants.values() {
630 match participant.location {
631 ParticipantLocation::SharedProject { project_id } => {
632 project_hosts_and_guest_counts
633 .entry(project_id)
634 .or_default()
635 .1 += 1;
636 }
637 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
638 }
639 for project in &participant.projects {
640 project_hosts_and_guest_counts
641 .entry(project.id)
642 .or_default()
643 .0 = Some(participant.user.id);
644 }
645 }
646
647 if let Some(user) = self.user_store.read(cx).current_user() {
648 for project in &self.local_participant.projects {
649 project_hosts_and_guest_counts
650 .entry(project.id)
651 .or_default()
652 .0 = Some(user.id);
653 }
654 }
655
656 project_hosts_and_guest_counts
657 .into_iter()
658 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
659 .max_by_key(|(_, _, guest_count)| *guest_count)
660 .map(|(id, host, _)| (id, host))
661 }
662
663 async fn handle_room_updated(
664 this: Model<Self>,
665 envelope: TypedEnvelope<proto::RoomUpdated>,
666 _: Arc<Client>,
667 mut cx: AsyncAppContext,
668 ) -> Result<()> {
669 let room = envelope
670 .payload
671 .room
672 .ok_or_else(|| anyhow!("invalid room"))?;
673 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
674 }
675
676 fn apply_room_update(
677 &mut self,
678 mut room: proto::Room,
679 cx: &mut ModelContext<Self>,
680 ) -> Result<()> {
681 // Filter ourselves out from the room's participants.
682 let local_participant_ix = room
683 .participants
684 .iter()
685 .position(|participant| Some(participant.user_id) == self.client.user_id());
686 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
687
688 let pending_participant_user_ids = room
689 .pending_participants
690 .iter()
691 .map(|p| p.user_id)
692 .collect::<Vec<_>>();
693
694 let remote_participant_user_ids = room
695 .participants
696 .iter()
697 .map(|p| p.user_id)
698 .collect::<Vec<_>>();
699
700 let (remote_participants, pending_participants) =
701 self.user_store.update(cx, move |user_store, cx| {
702 (
703 user_store.get_users(remote_participant_user_ids, cx),
704 user_store.get_users(pending_participant_user_ids, cx),
705 )
706 });
707
708 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
709 let (remote_participants, pending_participants) =
710 futures::join!(remote_participants, pending_participants);
711
712 this.update(&mut cx, |this, cx| {
713 this.participant_user_ids.clear();
714
715 if let Some(participant) = local_participant {
716 this.local_participant.projects = participant.projects;
717 } else {
718 this.local_participant.projects.clear();
719 }
720
721 if let Some(participants) = remote_participants.log_err() {
722 for (participant, user) in room.participants.into_iter().zip(participants) {
723 let Some(peer_id) = participant.peer_id else {
724 continue;
725 };
726 let participant_index = ParticipantIndex(participant.participant_index);
727 this.participant_user_ids.insert(participant.user_id);
728
729 let old_projects = this
730 .remote_participants
731 .get(&participant.user_id)
732 .into_iter()
733 .flat_map(|existing| &existing.projects)
734 .map(|project| project.id)
735 .collect::<HashSet<_>>();
736 let new_projects = participant
737 .projects
738 .iter()
739 .map(|project| project.id)
740 .collect::<HashSet<_>>();
741
742 for project in &participant.projects {
743 if !old_projects.contains(&project.id) {
744 cx.emit(Event::RemoteProjectShared {
745 owner: user.clone(),
746 project_id: project.id,
747 worktree_root_names: project.worktree_root_names.clone(),
748 });
749 }
750 }
751
752 for unshared_project_id in old_projects.difference(&new_projects) {
753 this.joined_projects.retain(|project| {
754 if let Some(project) = project.upgrade() {
755 project.update(cx, |project, cx| {
756 if project.remote_id() == Some(*unshared_project_id) {
757 project.disconnected_from_host(cx);
758 false
759 } else {
760 true
761 }
762 })
763 } else {
764 false
765 }
766 });
767 cx.emit(Event::RemoteProjectUnshared {
768 project_id: *unshared_project_id,
769 });
770 }
771
772 let location = ParticipantLocation::from_proto(participant.location)
773 .unwrap_or(ParticipantLocation::External);
774 if let Some(remote_participant) =
775 this.remote_participants.get_mut(&participant.user_id)
776 {
777 remote_participant.peer_id = peer_id;
778 remote_participant.projects = participant.projects;
779 remote_participant.participant_index = participant_index;
780 if location != remote_participant.location {
781 remote_participant.location = location;
782 cx.emit(Event::ParticipantLocationChanged {
783 participant_id: peer_id,
784 });
785 }
786 } else {
787 this.remote_participants.insert(
788 participant.user_id,
789 RemoteParticipant {
790 user: user.clone(),
791 participant_index,
792 peer_id,
793 projects: participant.projects,
794 location,
795 muted: true,
796 speaking: false,
797 // video_tracks: Default::default(),
798 // audio_tracks: Default::default(),
799 },
800 );
801
802 Audio::play_sound(Sound::Joined, cx);
803
804 // if let Some(live_kit) = this.live_kit.as_ref() {
805 // let video_tracks =
806 // live_kit.room.remote_video_tracks(&user.id.to_string());
807 // let audio_tracks =
808 // live_kit.room.remote_audio_tracks(&user.id.to_string());
809 // let publications = live_kit
810 // .room
811 // .remote_audio_track_publications(&user.id.to_string());
812
813 // for track in video_tracks {
814 // this.remote_video_track_updated(
815 // RemoteVideoTrackUpdate::Subscribed(track),
816 // cx,
817 // )
818 // .log_err();
819 // }
820
821 // for (track, publication) in
822 // audio_tracks.iter().zip(publications.iter())
823 // {
824 // this.remote_audio_track_updated(
825 // RemoteAudioTrackUpdate::Subscribed(
826 // track.clone(),
827 // publication.clone(),
828 // ),
829 // cx,
830 // )
831 // .log_err();
832 // }
833 // }
834 }
835 }
836
837 this.remote_participants.retain(|user_id, participant| {
838 if this.participant_user_ids.contains(user_id) {
839 true
840 } else {
841 for project in &participant.projects {
842 cx.emit(Event::RemoteProjectUnshared {
843 project_id: project.id,
844 });
845 }
846 false
847 }
848 });
849 }
850
851 if let Some(pending_participants) = pending_participants.log_err() {
852 this.pending_participants = pending_participants;
853 for participant in &this.pending_participants {
854 this.participant_user_ids.insert(participant.id);
855 }
856 }
857
858 this.follows_by_leader_id_project_id.clear();
859 for follower in room.followers {
860 let project_id = follower.project_id;
861 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
862 (Some(leader), Some(follower)) => (leader, follower),
863
864 _ => {
865 log::error!("Follower message {follower:?} missing some state");
866 continue;
867 }
868 };
869
870 let list = this
871 .follows_by_leader_id_project_id
872 .entry((leader, project_id))
873 .or_insert(Vec::new());
874 if !list.contains(&follower) {
875 list.push(follower);
876 }
877 }
878
879 this.pending_room_update.take();
880 if this.should_leave() {
881 log::info!("room is empty, leaving");
882 let _ = this.leave(cx);
883 }
884
885 this.user_store.update(cx, |user_store, cx| {
886 let participant_indices_by_user_id = this
887 .remote_participants
888 .iter()
889 .map(|(user_id, participant)| (*user_id, participant.participant_index))
890 .collect();
891 user_store.set_participant_indices(participant_indices_by_user_id, cx);
892 });
893
894 this.check_invariants();
895 this.room_update_completed_tx.try_send(Some(())).ok();
896 cx.notify();
897 })
898 .ok();
899 }));
900
901 cx.notify();
902 Ok(())
903 }
904
905 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
906 let mut done_rx = self.room_update_completed_rx.clone();
907 async move {
908 while let Some(result) = done_rx.next().await {
909 if result.is_some() {
910 break;
911 }
912 }
913 }
914 }
915
916 fn remote_video_track_updated(
917 &mut self,
918 change: RemoteVideoTrackUpdate,
919 cx: &mut ModelContext<Self>,
920 ) -> Result<()> {
921 todo!();
922 match change {
923 RemoteVideoTrackUpdate::Subscribed(track) => {
924 let user_id = track.publisher_id().parse()?;
925 let track_id = track.sid().to_string();
926 let participant = self
927 .remote_participants
928 .get_mut(&user_id)
929 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
930 // participant.video_tracks.insert(
931 // track_id.clone(),
932 // Arc::new(RemoteVideoTrack {
933 // live_kit_track: track,
934 // }),
935 // );
936 cx.emit(Event::RemoteVideoTracksChanged {
937 participant_id: participant.peer_id,
938 });
939 }
940 RemoteVideoTrackUpdate::Unsubscribed {
941 publisher_id,
942 track_id,
943 } => {
944 let user_id = publisher_id.parse()?;
945 let participant = self
946 .remote_participants
947 .get_mut(&user_id)
948 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
949 // participant.video_tracks.remove(&track_id);
950 cx.emit(Event::RemoteVideoTracksChanged {
951 participant_id: participant.peer_id,
952 });
953 }
954 }
955
956 cx.notify();
957 Ok(())
958 }
959
960 fn remote_audio_track_updated(
961 &mut self,
962 change: RemoteAudioTrackUpdate,
963 cx: &mut ModelContext<Self>,
964 ) -> Result<()> {
965 match change {
966 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
967 let mut speaker_ids = speakers
968 .into_iter()
969 .filter_map(|speaker_sid| speaker_sid.parse().ok())
970 .collect::<Vec<u64>>();
971 speaker_ids.sort_unstable();
972 for (sid, participant) in &mut self.remote_participants {
973 if let Ok(_) = speaker_ids.binary_search(sid) {
974 participant.speaking = true;
975 } else {
976 participant.speaking = false;
977 }
978 }
979 // todo!()
980 // if let Some(id) = self.client.user_id() {
981 // if let Some(room) = &mut self.live_kit {
982 // if let Ok(_) = speaker_ids.binary_search(&id) {
983 // room.speaking = true;
984 // } else {
985 // room.speaking = false;
986 // }
987 // }
988 // }
989 cx.notify();
990 }
991 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
992 // todo!()
993 // let mut found = false;
994 // for participant in &mut self.remote_participants.values_mut() {
995 // for track in participant.audio_tracks.values() {
996 // if track.sid() == track_id {
997 // found = true;
998 // break;
999 // }
1000 // }
1001 // if found {
1002 // participant.muted = muted;
1003 // break;
1004 // }
1005 // }
1006
1007 cx.notify();
1008 }
1009 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1010 // todo!()
1011 // let user_id = track.publisher_id().parse()?;
1012 // let track_id = track.sid().to_string();
1013 // let participant = self
1014 // .remote_participants
1015 // .get_mut(&user_id)
1016 // .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1017 // // participant.audio_tracks.insert(track_id.clone(), track);
1018 // participant.muted = publication.is_muted();
1019
1020 // cx.emit(Event::RemoteAudioTracksChanged {
1021 // participant_id: participant.peer_id,
1022 // });
1023 }
1024 RemoteAudioTrackUpdate::Unsubscribed {
1025 publisher_id,
1026 track_id,
1027 } => {
1028 // todo!()
1029 // let user_id = publisher_id.parse()?;
1030 // let participant = self
1031 // .remote_participants
1032 // .get_mut(&user_id)
1033 // .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1034 // participant.audio_tracks.remove(&track_id);
1035 // cx.emit(Event::RemoteAudioTracksChanged {
1036 // participant_id: participant.peer_id,
1037 // });
1038 }
1039 }
1040
1041 cx.notify();
1042 Ok(())
1043 }
1044
1045 fn check_invariants(&self) {
1046 #[cfg(any(test, feature = "test-support"))]
1047 {
1048 for participant in self.remote_participants.values() {
1049 assert!(self.participant_user_ids.contains(&participant.user.id));
1050 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1051 }
1052
1053 for participant in &self.pending_participants {
1054 assert!(self.participant_user_ids.contains(&participant.id));
1055 assert_ne!(participant.id, self.client.user_id().unwrap());
1056 }
1057
1058 assert_eq!(
1059 self.participant_user_ids.len(),
1060 self.remote_participants.len() + self.pending_participants.len()
1061 );
1062 }
1063 }
1064
1065 pub(crate) fn call(
1066 &mut self,
1067 called_user_id: u64,
1068 initial_project_id: Option<u64>,
1069 cx: &mut ModelContext<Self>,
1070 ) -> Task<Result<()>> {
1071 if self.status.is_offline() {
1072 return Task::ready(Err(anyhow!("room is offline")));
1073 }
1074
1075 cx.notify();
1076 let client = self.client.clone();
1077 let room_id = self.id;
1078 self.pending_call_count += 1;
1079 cx.spawn(move |this, mut cx| async move {
1080 let result = client
1081 .request(proto::Call {
1082 room_id,
1083 called_user_id,
1084 initial_project_id,
1085 })
1086 .await;
1087 this.update(&mut cx, |this, cx| {
1088 this.pending_call_count -= 1;
1089 if this.should_leave() {
1090 this.leave(cx).detach_and_log_err(cx);
1091 }
1092 })?;
1093 result?;
1094 Ok(())
1095 })
1096 }
1097
1098 pub fn join_project(
1099 &mut self,
1100 id: u64,
1101 language_registry: Arc<LanguageRegistry>,
1102 fs: Arc<dyn Fs>,
1103 cx: &mut ModelContext<Self>,
1104 ) -> Task<Result<Model<Project>>> {
1105 let client = self.client.clone();
1106 let user_store = self.user_store.clone();
1107 cx.emit(Event::RemoteProjectJoined { project_id: id });
1108 cx.spawn(move |this, mut cx| async move {
1109 let project =
1110 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1111
1112 this.update(&mut cx, |this, cx| {
1113 this.joined_projects.retain(|project| {
1114 if let Some(project) = project.upgrade() {
1115 !project.read(cx).is_read_only()
1116 } else {
1117 false
1118 }
1119 });
1120 this.joined_projects.insert(project.downgrade());
1121 })?;
1122 Ok(project)
1123 })
1124 }
1125
1126 pub(crate) fn share_project(
1127 &mut self,
1128 project: Model<Project>,
1129 cx: &mut ModelContext<Self>,
1130 ) -> Task<Result<u64>> {
1131 if let Some(project_id) = project.read(cx).remote_id() {
1132 return Task::ready(Ok(project_id));
1133 }
1134
1135 let request = self.client.request(proto::ShareProject {
1136 room_id: self.id(),
1137 worktrees: project.read(cx).worktree_metadata_protos(cx),
1138 });
1139 cx.spawn(|this, mut cx| async move {
1140 let response = request.await?;
1141
1142 project.update(&mut cx, |project, cx| {
1143 project.shared(response.project_id, cx)
1144 })??;
1145
1146 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1147 this.update(&mut cx, |this, cx| {
1148 this.shared_projects.insert(project.downgrade());
1149 let active_project = this.local_participant.active_project.as_ref();
1150 if active_project.map_or(false, |location| *location == project) {
1151 this.set_location(Some(&project), cx)
1152 } else {
1153 Task::ready(Ok(()))
1154 }
1155 })?
1156 .await?;
1157
1158 Ok(response.project_id)
1159 })
1160 }
1161
1162 pub(crate) fn unshare_project(
1163 &mut self,
1164 project: Model<Project>,
1165 cx: &mut ModelContext<Self>,
1166 ) -> Result<()> {
1167 let project_id = match project.read(cx).remote_id() {
1168 Some(project_id) => project_id,
1169 None => return Ok(()),
1170 };
1171
1172 self.client.send(proto::UnshareProject { project_id })?;
1173 project.update(cx, |this, cx| this.unshare(cx))
1174 }
1175
1176 pub(crate) fn set_location(
1177 &mut self,
1178 project: Option<&Model<Project>>,
1179 cx: &mut ModelContext<Self>,
1180 ) -> Task<Result<()>> {
1181 if self.status.is_offline() {
1182 return Task::ready(Err(anyhow!("room is offline")));
1183 }
1184
1185 let client = self.client.clone();
1186 let room_id = self.id;
1187 let location = if let Some(project) = project {
1188 self.local_participant.active_project = Some(project.downgrade());
1189 if let Some(project_id) = project.read(cx).remote_id() {
1190 proto::participant_location::Variant::SharedProject(
1191 proto::participant_location::SharedProject { id: project_id },
1192 )
1193 } else {
1194 proto::participant_location::Variant::UnsharedProject(
1195 proto::participant_location::UnsharedProject {},
1196 )
1197 }
1198 } else {
1199 self.local_participant.active_project = None;
1200 proto::participant_location::Variant::External(proto::participant_location::External {})
1201 };
1202
1203 cx.notify();
1204 cx.executor().spawn_on_main(move || async move {
1205 client
1206 .request(proto::UpdateParticipantLocation {
1207 room_id,
1208 location: Some(proto::ParticipantLocation {
1209 variant: Some(location),
1210 }),
1211 })
1212 .await?;
1213 Ok(())
1214 })
1215 }
1216
1217 pub fn is_screen_sharing(&self) -> bool {
1218 todo!()
1219 // self.live_kit.as_ref().map_or(false, |live_kit| {
1220 // !matches!(live_kit.screen_track, LocalTrack::None)
1221 // })
1222 }
1223
1224 pub fn is_sharing_mic(&self) -> bool {
1225 todo!()
1226 // self.live_kit.as_ref().map_or(false, |live_kit| {
1227 // !matches!(live_kit.microphone_track, LocalTrack::None)
1228 // })
1229 }
1230
1231 pub fn is_muted(&self, cx: &AppContext) -> bool {
1232 todo!()
1233 // self.live_kit
1234 // .as_ref()
1235 // .and_then(|live_kit| match &live_kit.microphone_track {
1236 // LocalTrack::None => Some(Self::mute_on_join(cx)),
1237 // LocalTrack::Pending { muted, .. } => Some(*muted),
1238 // LocalTrack::Published { muted, .. } => Some(*muted),
1239 // })
1240 // .unwrap_or(false)
1241 }
1242
1243 pub fn is_speaking(&self) -> bool {
1244 todo!()
1245 // self.live_kit
1246 // .as_ref()
1247 // .map_or(false, |live_kit| live_kit.speaking)
1248 }
1249
1250 pub fn is_deafened(&self) -> Option<bool> {
1251 // self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1252 todo!()
1253 }
1254
1255 #[track_caller]
1256 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1257 todo!()
1258 // if self.status.is_offline() {
1259 // return Task::ready(Err(anyhow!("room is offline")));
1260 // } else if self.is_sharing_mic() {
1261 // return Task::ready(Err(anyhow!("microphone was already shared")));
1262 // }
1263
1264 // let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1265 // let publish_id = post_inc(&mut live_kit.next_publish_id);
1266 // live_kit.microphone_track = LocalTrack::Pending {
1267 // publish_id,
1268 // muted: false,
1269 // };
1270 // cx.notify();
1271 // publish_id
1272 // } else {
1273 // return Task::ready(Err(anyhow!("live-kit was not initialized")));
1274 // };
1275
1276 // cx.spawn(move |this, mut cx| async move {
1277 // let publish_track = async {
1278 // let track = LocalAudioTrack::create();
1279 // this.upgrade()
1280 // .ok_or_else(|| anyhow!("room was dropped"))?
1281 // .update(&mut cx, |this, _| {
1282 // this.live_kit
1283 // .as_ref()
1284 // .map(|live_kit| live_kit.room.publish_audio_track(track))
1285 // })?
1286 // .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1287 // .await
1288 // };
1289
1290 // let publication = publish_track.await;
1291 // this.upgrade()
1292 // .ok_or_else(|| anyhow!("room was dropped"))?
1293 // .update(&mut cx, |this, cx| {
1294 // let live_kit = this
1295 // .live_kit
1296 // .as_mut()
1297 // .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1298
1299 // let (canceled, muted) = if let LocalTrack::Pending {
1300 // publish_id: cur_publish_id,
1301 // muted,
1302 // } = &live_kit.microphone_track
1303 // {
1304 // (*cur_publish_id != publish_id, *muted)
1305 // } else {
1306 // (true, false)
1307 // };
1308
1309 // match publication {
1310 // Ok(publication) => {
1311 // if canceled {
1312 // live_kit.room.unpublish_track(publication);
1313 // } else {
1314 // if muted {
1315 // cx.executor().spawn(publication.set_mute(muted)).detach();
1316 // }
1317 // live_kit.microphone_track = LocalTrack::Published {
1318 // track_publication: publication,
1319 // muted,
1320 // };
1321 // cx.notify();
1322 // }
1323 // Ok(())
1324 // }
1325 // Err(error) => {
1326 // if canceled {
1327 // Ok(())
1328 // } else {
1329 // live_kit.microphone_track = LocalTrack::None;
1330 // cx.notify();
1331 // Err(error)
1332 // }
1333 // }
1334 // }
1335 // })?
1336 // })
1337 }
1338
1339 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1340 todo!()
1341 // if self.status.is_offline() {
1342 // return Task::ready(Err(anyhow!("room is offline")));
1343 // } else if self.is_screen_sharing() {
1344 // return Task::ready(Err(anyhow!("screen was already shared")));
1345 // }
1346
1347 // let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1348 // let publish_id = post_inc(&mut live_kit.next_publish_id);
1349 // live_kit.screen_track = LocalTrack::Pending {
1350 // publish_id,
1351 // muted: false,
1352 // };
1353 // cx.notify();
1354 // (live_kit.room.display_sources(), publish_id)
1355 // } else {
1356 // return Task::ready(Err(anyhow!("live-kit was not initialized")));
1357 // };
1358
1359 // cx.spawn(move |this, mut cx| async move {
1360 // let publish_track = async {
1361 // let displays = displays.await?;
1362 // let display = displays
1363 // .first()
1364 // .ok_or_else(|| anyhow!("no display found"))?;
1365 // let track = LocalVideoTrack::screen_share_for_display(&display);
1366 // this.upgrade()
1367 // .ok_or_else(|| anyhow!("room was dropped"))?
1368 // .update(&mut cx, |this, _| {
1369 // this.live_kit
1370 // .as_ref()
1371 // .map(|live_kit| live_kit.room.publish_video_track(track))
1372 // })?
1373 // .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1374 // .await
1375 // };
1376
1377 // let publication = publish_track.await;
1378 // this.upgrade()
1379 // .ok_or_else(|| anyhow!("room was dropped"))?
1380 // .update(&mut cx, |this, cx| {
1381 // let live_kit = this
1382 // .live_kit
1383 // .as_mut()
1384 // .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1385
1386 // let (canceled, muted) = if let LocalTrack::Pending {
1387 // publish_id: cur_publish_id,
1388 // muted,
1389 // } = &live_kit.screen_track
1390 // {
1391 // (*cur_publish_id != publish_id, *muted)
1392 // } else {
1393 // (true, false)
1394 // };
1395
1396 // match publication {
1397 // Ok(publication) => {
1398 // if canceled {
1399 // live_kit.room.unpublish_track(publication);
1400 // } else {
1401 // if muted {
1402 // cx.executor().spawn(publication.set_mute(muted)).detach();
1403 // }
1404 // live_kit.screen_track = LocalTrack::Published {
1405 // track_publication: publication,
1406 // muted,
1407 // };
1408 // cx.notify();
1409 // }
1410
1411 // Audio::play_sound(Sound::StartScreenshare, cx);
1412
1413 // Ok(())
1414 // }
1415 // Err(error) => {
1416 // if canceled {
1417 // Ok(())
1418 // } else {
1419 // live_kit.screen_track = LocalTrack::None;
1420 // cx.notify();
1421 // Err(error)
1422 // }
1423 // }
1424 // }
1425 // })?
1426 // })
1427 }
1428
1429 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1430 todo!()
1431 // let should_mute = !self.is_muted(cx);
1432 // if let Some(live_kit) = self.live_kit.as_mut() {
1433 // if matches!(live_kit.microphone_track, LocalTrack::None) {
1434 // return Ok(self.share_microphone(cx));
1435 // }
1436
1437 // let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1438 // live_kit.muted_by_user = should_mute;
1439
1440 // if old_muted == true && live_kit.deafened == true {
1441 // if let Some(task) = self.toggle_deafen(cx).ok() {
1442 // task.detach();
1443 // }
1444 // }
1445
1446 // Ok(ret_task)
1447 // } else {
1448 // Err(anyhow!("LiveKit not started"))
1449 // }
1450 }
1451
1452 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1453 todo!()
1454 // if let Some(live_kit) = self.live_kit.as_mut() {
1455 // (*live_kit).deafened = !live_kit.deafened;
1456
1457 // let mut tasks = Vec::with_capacity(self.remote_participants.len());
1458 // // Context notification is sent within set_mute itself.
1459 // let mut mute_task = None;
1460 // // When deafening, mute user's mic as well.
1461 // // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1462 // if live_kit.deafened || !live_kit.muted_by_user {
1463 // mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1464 // };
1465 // for participant in self.remote_participants.values() {
1466 // for track in live_kit
1467 // .room
1468 // .remote_audio_track_publications(&participant.user.id.to_string())
1469 // {
1470 // let deafened = live_kit.deafened;
1471 // tasks.push(
1472 // cx.executor()
1473 // .spawn_on_main(move || track.set_enabled(!deafened)),
1474 // );
1475 // }
1476 // }
1477
1478 // Ok(cx.executor().spawn_on_main(|| async {
1479 // if let Some(mute_task) = mute_task {
1480 // mute_task.await?;
1481 // }
1482 // for task in tasks {
1483 // task.await?;
1484 // }
1485 // Ok(())
1486 // }))
1487 // } else {
1488 // Err(anyhow!("LiveKit not started"))
1489 // }
1490 }
1491
1492 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1493 if self.status.is_offline() {
1494 return Err(anyhow!("room is offline"));
1495 }
1496
1497 todo!()
1498 // let live_kit = self
1499 // .live_kit
1500 // .as_mut()
1501 // .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1502 // match mem::take(&mut live_kit.screen_track) {
1503 // LocalTrack::None => Err(anyhow!("screen was not shared")),
1504 // LocalTrack::Pending { .. } => {
1505 // cx.notify();
1506 // Ok(())
1507 // }
1508 // LocalTrack::Published {
1509 // track_publication, ..
1510 // } => {
1511 // live_kit.room.unpublish_track(track_publication);
1512 // cx.notify();
1513
1514 // Audio::play_sound(Sound::StopScreenshare, cx);
1515 // Ok(())
1516 // }
1517 // }
1518 }
1519
1520 #[cfg(any(test, feature = "test-support"))]
1521 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1522 todo!()
1523 // self.live_kit
1524 // .as_ref()
1525 // .unwrap()
1526 // .room
1527 // .set_display_sources(sources);
1528 }
1529}
1530
1531struct LiveKitRoom {
1532 room: Arc<live_kit_client::Room>,
1533 screen_track: LocalTrack,
1534 microphone_track: LocalTrack,
1535 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1536 muted_by_user: bool,
1537 deafened: bool,
1538 speaking: bool,
1539 next_publish_id: usize,
1540 _maintain_room: Task<()>,
1541 _maintain_tracks: [Task<()>; 2],
1542}
1543
1544impl LiveKitRoom {
1545 fn set_mute(
1546 self: &mut LiveKitRoom,
1547 should_mute: bool,
1548 cx: &mut ModelContext<Room>,
1549 ) -> Result<(Task<Result<()>>, bool)> {
1550 if !should_mute {
1551 // clear user muting state.
1552 self.muted_by_user = false;
1553 }
1554
1555 let (result, old_muted) = match &mut self.microphone_track {
1556 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1557 LocalTrack::Pending { muted, .. } => {
1558 let old_muted = *muted;
1559 *muted = should_mute;
1560 cx.notify();
1561 Ok((Task::Ready(Some(Ok(()))), old_muted))
1562 }
1563 LocalTrack::Published {
1564 track_publication,
1565 muted,
1566 } => {
1567 let old_muted = *muted;
1568 *muted = should_mute;
1569 cx.notify();
1570 Ok((
1571 cx.executor().spawn(track_publication.set_mute(*muted)),
1572 old_muted,
1573 ))
1574 }
1575 }?;
1576
1577 if old_muted != should_mute {
1578 if should_mute {
1579 Audio::play_sound(Sound::Mute, cx);
1580 } else {
1581 Audio::play_sound(Sound::Unmute, cx);
1582 }
1583 }
1584
1585 Ok((result, old_muted))
1586 }
1587}
1588
1589enum LocalTrack {
1590 None,
1591 Pending {
1592 publish_id: usize,
1593 muted: bool,
1594 },
1595 Published {
1596 track_publication: LocalTrackPublication,
1597 muted: bool,
1598 },
1599}
1600
1601impl Default for LocalTrack {
1602 fn default() -> Self {
1603 Self::None
1604 }
1605}
1606
1607#[derive(Copy, Clone, PartialEq, Eq)]
1608pub enum RoomStatus {
1609 Online,
1610 Rejoining,
1611 Offline,
1612}
1613
1614impl RoomStatus {
1615 pub fn is_offline(&self) -> bool {
1616 matches!(self, RoomStatus::Offline)
1617 }
1618
1619 pub fn is_online(&self) -> bool {
1620 matches!(self, RoomStatus::Online)
1621 }
1622}