1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
4};
5use anyhow::{anyhow, Result};
6use audio::{Audio, Sound};
7use client::{
8 proto::{self, PeerId},
9 ChannelId, Client, ParticipantIndex, TypedEnvelope, User, UserStore,
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use fs::Fs;
13use futures::{FutureExt, StreamExt};
14use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
15use language::LanguageRegistry;
16use livekit_client_macos::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
17use postage::{sink::Sink, stream::Stream, watch};
18use project::Project;
19use settings::Settings as _;
20use std::{future::Future, mem, sync::Arc, time::Duration};
21use util::{post_inc, ResultExt, TryFutureExt};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub enum Event {
27 RoomJoined {
28 channel_id: Option<ChannelId>,
29 },
30 ParticipantLocationChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteVideoTracksChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteAudioTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteProjectShared {
40 owner: Arc<User>,
41 project_id: u64,
42 worktree_root_names: Vec<String>,
43 },
44 RemoteProjectUnshared {
45 project_id: u64,
46 },
47 RemoteProjectJoined {
48 project_id: u64,
49 },
50 RemoteProjectInvitationDiscarded {
51 project_id: u64,
52 },
53 RoomLeft {
54 channel_id: Option<ChannelId>,
55 },
56}
57
58pub struct Room {
59 id: u64,
60 channel_id: Option<ChannelId>,
61 live_kit: Option<LiveKitRoom>,
62 status: RoomStatus,
63 shared_projects: HashSet<WeakEntity<Project>>,
64 joined_projects: HashSet<WeakEntity<Project>>,
65 local_participant: LocalParticipant,
66 remote_participants: BTreeMap<u64, RemoteParticipant>,
67 pending_participants: Vec<Arc<User>>,
68 participant_user_ids: HashSet<u64>,
69 pending_call_count: usize,
70 leave_when_empty: bool,
71 client: Arc<Client>,
72 user_store: Entity<UserStore>,
73 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
74 client_subscriptions: Vec<client::Subscription>,
75 _subscriptions: Vec<gpui::Subscription>,
76 room_update_completed_tx: watch::Sender<Option<()>>,
77 room_update_completed_rx: watch::Receiver<Option<()>>,
78 pending_room_update: Option<Task<()>>,
79 maintain_connection: Option<Task<Option<()>>>,
80}
81
82impl EventEmitter<Event> for Room {}
83
84impl Room {
85 pub fn channel_id(&self) -> Option<ChannelId> {
86 self.channel_id
87 }
88
89 pub fn is_sharing_project(&self) -> bool {
90 !self.shared_projects.is_empty()
91 }
92
93 #[cfg(any(test, feature = "test-support"))]
94 pub fn is_connected(&self) -> bool {
95 if let Some(live_kit) = self.live_kit.as_ref() {
96 matches!(
97 *live_kit.room.status().borrow(),
98 livekit_client_macos::ConnectionState::Connected { .. }
99 )
100 } else {
101 false
102 }
103 }
104
105 fn new(
106 id: u64,
107 channel_id: Option<ChannelId>,
108 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
109 client: Arc<Client>,
110 user_store: Entity<UserStore>,
111 cx: &mut Context<Self>,
112 ) -> Self {
113 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
114 let room = livekit_client_macos::Room::new();
115 let mut status = room.status();
116 // Consume the initial status of the room.
117 let _ = status.try_recv();
118 let _maintain_room = cx.spawn(|this, mut cx| async move {
119 while let Some(status) = status.next().await {
120 let this = if let Some(this) = this.upgrade() {
121 this
122 } else {
123 break;
124 };
125
126 if status == livekit_client_macos::ConnectionState::Disconnected {
127 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
128 .ok();
129 break;
130 }
131 }
132 });
133
134 let _handle_updates = cx.spawn({
135 let room = room.clone();
136 move |this, mut cx| async move {
137 let mut updates = room.updates();
138 while let Some(update) = updates.next().await {
139 let this = if let Some(this) = this.upgrade() {
140 this
141 } else {
142 break;
143 };
144
145 this.update(&mut cx, |this, cx| {
146 this.live_kit_room_updated(update, cx).log_err()
147 })
148 .ok();
149 }
150 }
151 });
152
153 let connect = room.connect(&connection_info.server_url, &connection_info.token);
154 cx.spawn(|this, mut cx| async move {
155 connect.await?;
156 this.update(&mut cx, |this, cx| {
157 if this.can_use_microphone() {
158 if let Some(live_kit) = &this.live_kit {
159 if !live_kit.muted_by_user && !live_kit.deafened {
160 return this.share_microphone(cx);
161 }
162 }
163 }
164 Task::ready(Ok(()))
165 })?
166 .await
167 })
168 .detach_and_log_err(cx);
169
170 Some(LiveKitRoom {
171 room,
172 screen_track: LocalTrack::None,
173 microphone_track: LocalTrack::None,
174 next_publish_id: 0,
175 muted_by_user: Self::mute_on_join(cx),
176 deafened: false,
177 speaking: false,
178 _maintain_room,
179 _handle_updates,
180 })
181 } else {
182 None
183 };
184
185 let maintain_connection = cx.spawn({
186 let client = client.clone();
187 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
188 });
189
190 Audio::play_sound(Sound::Joined, cx);
191
192 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
193
194 Self {
195 id,
196 channel_id,
197 live_kit: live_kit_room,
198 status: RoomStatus::Online,
199 shared_projects: Default::default(),
200 joined_projects: Default::default(),
201 participant_user_ids: Default::default(),
202 local_participant: Default::default(),
203 remote_participants: Default::default(),
204 pending_participants: Default::default(),
205 pending_call_count: 0,
206 client_subscriptions: vec![
207 client.add_message_handler(cx.weak_entity(), Self::handle_room_updated)
208 ],
209 _subscriptions: vec![
210 cx.on_release(Self::released),
211 cx.on_app_quit(Self::app_will_quit),
212 ],
213 leave_when_empty: false,
214 pending_room_update: None,
215 client,
216 user_store,
217 follows_by_leader_id_project_id: Default::default(),
218 maintain_connection: Some(maintain_connection),
219 room_update_completed_tx,
220 room_update_completed_rx,
221 }
222 }
223
224 pub(crate) fn create(
225 called_user_id: u64,
226 initial_project: Option<Entity<Project>>,
227 client: Arc<Client>,
228 user_store: Entity<UserStore>,
229 cx: &mut App,
230 ) -> Task<Result<Entity<Self>>> {
231 cx.spawn(move |mut cx| async move {
232 let response = client.request(proto::CreateRoom {}).await?;
233 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
234 let room = cx.new(|cx| {
235 let mut room = Self::new(
236 room_proto.id,
237 None,
238 response.live_kit_connection_info,
239 client,
240 user_store,
241 cx,
242 );
243 if let Some(participant) = room_proto.participants.first() {
244 room.local_participant.role = participant.role()
245 }
246 room
247 })?;
248
249 let initial_project_id = if let Some(initial_project) = initial_project {
250 let initial_project_id = room
251 .update(&mut cx, |room, cx| {
252 room.share_project(initial_project.clone(), cx)
253 })?
254 .await?;
255 Some(initial_project_id)
256 } else {
257 None
258 };
259
260 let did_join = room
261 .update(&mut cx, |room, cx| {
262 room.leave_when_empty = true;
263 room.call(called_user_id, initial_project_id, cx)
264 })?
265 .await;
266 match did_join {
267 Ok(()) => Ok(room),
268 Err(error) => Err(error.context("room creation failed")),
269 }
270 })
271 }
272
273 pub(crate) async fn join_channel(
274 channel_id: ChannelId,
275 client: Arc<Client>,
276 user_store: Entity<UserStore>,
277 cx: AsyncApp,
278 ) -> Result<Entity<Self>> {
279 Self::from_join_response(
280 client
281 .request(proto::JoinChannel {
282 channel_id: channel_id.0,
283 })
284 .await?,
285 client,
286 user_store,
287 cx,
288 )
289 }
290
291 pub(crate) async fn join(
292 room_id: u64,
293 client: Arc<Client>,
294 user_store: Entity<UserStore>,
295 cx: AsyncApp,
296 ) -> Result<Entity<Self>> {
297 Self::from_join_response(
298 client.request(proto::JoinRoom { id: room_id }).await?,
299 client,
300 user_store,
301 cx,
302 )
303 }
304
305 fn released(&mut self, cx: &mut App) {
306 if self.status.is_online() {
307 self.leave_internal(cx).detach_and_log_err(cx);
308 }
309 }
310
311 fn app_will_quit(&mut self, cx: &mut Context<Self>) -> impl Future<Output = ()> {
312 let task = if self.status.is_online() {
313 let leave = self.leave_internal(cx);
314 Some(cx.background_executor().spawn(async move {
315 leave.await.log_err();
316 }))
317 } else {
318 None
319 };
320
321 async move {
322 if let Some(task) = task {
323 task.await;
324 }
325 }
326 }
327
328 pub fn mute_on_join(cx: &App) -> bool {
329 CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
330 }
331
332 fn from_join_response(
333 response: proto::JoinRoomResponse,
334 client: Arc<Client>,
335 user_store: Entity<UserStore>,
336 mut cx: AsyncApp,
337 ) -> Result<Entity<Self>> {
338 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
339 let room = cx.new(|cx| {
340 Self::new(
341 room_proto.id,
342 response.channel_id.map(ChannelId),
343 response.live_kit_connection_info,
344 client,
345 user_store,
346 cx,
347 )
348 })?;
349 room.update(&mut cx, |room, cx| {
350 room.leave_when_empty = room.channel_id.is_none();
351 room.apply_room_update(room_proto, cx)?;
352 anyhow::Ok(())
353 })??;
354 Ok(room)
355 }
356
357 fn should_leave(&self) -> bool {
358 self.leave_when_empty
359 && self.pending_room_update.is_none()
360 && self.pending_participants.is_empty()
361 && self.remote_participants.is_empty()
362 && self.pending_call_count == 0
363 }
364
365 pub(crate) fn leave(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
366 cx.notify();
367 self.leave_internal(cx)
368 }
369
370 fn leave_internal(&mut self, cx: &mut App) -> Task<Result<()>> {
371 if self.status.is_offline() {
372 return Task::ready(Err(anyhow!("room is offline")));
373 }
374
375 log::info!("leaving room");
376 Audio::play_sound(Sound::Leave, cx);
377
378 self.clear_state(cx);
379
380 let leave_room = self.client.request(proto::LeaveRoom {});
381 cx.background_executor().spawn(async move {
382 leave_room.await?;
383 anyhow::Ok(())
384 })
385 }
386
387 pub(crate) fn clear_state(&mut self, cx: &mut App) {
388 for project in self.shared_projects.drain() {
389 if let Some(project) = project.upgrade() {
390 project.update(cx, |project, cx| {
391 project.unshare(cx).log_err();
392 });
393 }
394 }
395 for project in self.joined_projects.drain() {
396 if let Some(project) = project.upgrade() {
397 project.update(cx, |project, cx| {
398 project.disconnected_from_host(cx);
399 project.close(cx);
400 });
401 }
402 }
403
404 self.status = RoomStatus::Offline;
405 self.remote_participants.clear();
406 self.pending_participants.clear();
407 self.participant_user_ids.clear();
408 self.client_subscriptions.clear();
409 self.live_kit.take();
410 self.pending_room_update.take();
411 self.maintain_connection.take();
412 }
413
414 async fn maintain_connection(
415 this: WeakEntity<Self>,
416 client: Arc<Client>,
417 mut cx: AsyncApp,
418 ) -> Result<()> {
419 let mut client_status = client.status();
420 loop {
421 let _ = client_status.try_recv();
422 let is_connected = client_status.borrow().is_connected();
423 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
424 if !is_connected || client_status.next().await.is_some() {
425 log::info!("detected client disconnection");
426
427 this.upgrade()
428 .ok_or_else(|| anyhow!("room was dropped"))?
429 .update(&mut cx, |this, cx| {
430 this.status = RoomStatus::Rejoining;
431 cx.notify();
432 })?;
433
434 // Wait for client to re-establish a connection to the server.
435 {
436 let mut reconnection_timeout =
437 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
438 let client_reconnection = async {
439 let mut remaining_attempts = 3;
440 while remaining_attempts > 0 {
441 if client_status.borrow().is_connected() {
442 log::info!("client reconnected, attempting to rejoin room");
443
444 let Some(this) = this.upgrade() else { break };
445 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
446 Ok(task) => {
447 if task.await.log_err().is_some() {
448 return true;
449 } else {
450 remaining_attempts -= 1;
451 }
452 }
453 Err(_app_dropped) => return false,
454 }
455 } else if client_status.borrow().is_signed_out() {
456 return false;
457 }
458
459 log::info!(
460 "waiting for client status change, remaining attempts {}",
461 remaining_attempts
462 );
463 client_status.next().await;
464 }
465 false
466 }
467 .fuse();
468 futures::pin_mut!(client_reconnection);
469
470 futures::select_biased! {
471 reconnected = client_reconnection => {
472 if reconnected {
473 log::info!("successfully reconnected to room");
474 // If we successfully joined the room, go back around the loop
475 // waiting for future connection status changes.
476 continue;
477 }
478 }
479 _ = reconnection_timeout => {
480 log::info!("room reconnection timeout expired");
481 }
482 }
483 }
484
485 break;
486 }
487 }
488
489 // The client failed to re-establish a connection to the server
490 // or an error occurred while trying to re-join the room. Either way
491 // we leave the room and return an error.
492 if let Some(this) = this.upgrade() {
493 log::info!("reconnection failed, leaving room");
494 this.update(&mut cx, |this, cx| this.leave(cx))?.await?;
495 }
496 Err(anyhow!(
497 "can't reconnect to room: client failed to re-establish connection"
498 ))
499 }
500
501 fn rejoin(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
502 let mut projects = HashMap::default();
503 let mut reshared_projects = Vec::new();
504 let mut rejoined_projects = Vec::new();
505 self.shared_projects.retain(|project| {
506 if let Some(handle) = project.upgrade() {
507 let project = handle.read(cx);
508 if let Some(project_id) = project.remote_id() {
509 projects.insert(project_id, handle.clone());
510 reshared_projects.push(proto::UpdateProject {
511 project_id,
512 worktrees: project.worktree_metadata_protos(cx),
513 });
514 return true;
515 }
516 }
517 false
518 });
519 self.joined_projects.retain(|project| {
520 if let Some(handle) = project.upgrade() {
521 let project = handle.read(cx);
522 if let Some(project_id) = project.remote_id() {
523 projects.insert(project_id, handle.clone());
524 rejoined_projects.push(proto::RejoinProject {
525 id: project_id,
526 worktrees: project
527 .worktrees(cx)
528 .map(|worktree| {
529 let worktree = worktree.read(cx);
530 proto::RejoinWorktree {
531 id: worktree.id().to_proto(),
532 scan_id: worktree.completed_scan_id() as u64,
533 }
534 })
535 .collect(),
536 });
537 }
538 return true;
539 }
540 false
541 });
542
543 let response = self.client.request_envelope(proto::RejoinRoom {
544 id: self.id,
545 reshared_projects,
546 rejoined_projects,
547 });
548
549 cx.spawn(|this, mut cx| async move {
550 let response = response.await?;
551 let message_id = response.message_id;
552 let response = response.payload;
553 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
554 this.update(&mut cx, |this, cx| {
555 this.status = RoomStatus::Online;
556 this.apply_room_update(room_proto, cx)?;
557
558 for reshared_project in response.reshared_projects {
559 if let Some(project) = projects.get(&reshared_project.id) {
560 project.update(cx, |project, cx| {
561 project.reshared(reshared_project, cx).log_err();
562 });
563 }
564 }
565
566 for rejoined_project in response.rejoined_projects {
567 if let Some(project) = projects.get(&rejoined_project.id) {
568 project.update(cx, |project, cx| {
569 project.rejoined(rejoined_project, message_id, cx).log_err();
570 });
571 }
572 }
573
574 anyhow::Ok(())
575 })?
576 })
577 }
578
579 pub fn id(&self) -> u64 {
580 self.id
581 }
582
583 pub fn status(&self) -> RoomStatus {
584 self.status
585 }
586
587 pub fn local_participant(&self) -> &LocalParticipant {
588 &self.local_participant
589 }
590
591 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
592 &self.remote_participants
593 }
594
595 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
596 self.remote_participants
597 .values()
598 .find(|p| p.peer_id == peer_id)
599 }
600
601 pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
602 self.remote_participants
603 .get(&user_id)
604 .map(|participant| participant.role)
605 }
606
607 pub fn contains_guests(&self) -> bool {
608 self.local_participant.role == proto::ChannelRole::Guest
609 || self
610 .remote_participants
611 .values()
612 .any(|p| p.role == proto::ChannelRole::Guest)
613 }
614
615 pub fn local_participant_is_admin(&self) -> bool {
616 self.local_participant.role == proto::ChannelRole::Admin
617 }
618
619 pub fn local_participant_is_guest(&self) -> bool {
620 self.local_participant.role == proto::ChannelRole::Guest
621 }
622
623 pub fn set_participant_role(
624 &mut self,
625 user_id: u64,
626 role: proto::ChannelRole,
627 cx: &Context<Self>,
628 ) -> Task<Result<()>> {
629 let client = self.client.clone();
630 let room_id = self.id;
631 let role = role.into();
632 cx.spawn(|_, _| async move {
633 client
634 .request(proto::SetRoomParticipantRole {
635 room_id,
636 user_id,
637 role,
638 })
639 .await
640 .map(|_| ())
641 })
642 }
643
644 pub fn pending_participants(&self) -> &[Arc<User>] {
645 &self.pending_participants
646 }
647
648 pub fn contains_participant(&self, user_id: u64) -> bool {
649 self.participant_user_ids.contains(&user_id)
650 }
651
652 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
653 self.follows_by_leader_id_project_id
654 .get(&(leader_id, project_id))
655 .map_or(&[], |v| v.as_slice())
656 }
657
658 /// Returns the most 'active' projects, defined as most people in the project
659 pub fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
660 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
661 for participant in self.remote_participants.values() {
662 match participant.location {
663 ParticipantLocation::SharedProject { project_id } => {
664 project_hosts_and_guest_counts
665 .entry(project_id)
666 .or_default()
667 .1 += 1;
668 }
669 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
670 }
671 for project in &participant.projects {
672 project_hosts_and_guest_counts
673 .entry(project.id)
674 .or_default()
675 .0 = Some(participant.user.id);
676 }
677 }
678
679 if let Some(user) = self.user_store.read(cx).current_user() {
680 for project in &self.local_participant.projects {
681 project_hosts_and_guest_counts
682 .entry(project.id)
683 .or_default()
684 .0 = Some(user.id);
685 }
686 }
687
688 project_hosts_and_guest_counts
689 .into_iter()
690 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
691 .max_by_key(|(_, _, guest_count)| *guest_count)
692 .map(|(id, host, _)| (id, host))
693 }
694
695 async fn handle_room_updated(
696 this: Entity<Self>,
697 envelope: TypedEnvelope<proto::RoomUpdated>,
698 mut cx: AsyncApp,
699 ) -> Result<()> {
700 let room = envelope
701 .payload
702 .room
703 .ok_or_else(|| anyhow!("invalid room"))?;
704 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
705 }
706
707 fn apply_room_update(&mut self, mut room: proto::Room, cx: &mut Context<Self>) -> Result<()> {
708 // Filter ourselves out from the room's participants.
709 let local_participant_ix = room
710 .participants
711 .iter()
712 .position(|participant| Some(participant.user_id) == self.client.user_id());
713 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
714
715 let pending_participant_user_ids = room
716 .pending_participants
717 .iter()
718 .map(|p| p.user_id)
719 .collect::<Vec<_>>();
720
721 let remote_participant_user_ids = room
722 .participants
723 .iter()
724 .map(|p| p.user_id)
725 .collect::<Vec<_>>();
726
727 let (remote_participants, pending_participants) =
728 self.user_store.update(cx, move |user_store, cx| {
729 (
730 user_store.get_users(remote_participant_user_ids, cx),
731 user_store.get_users(pending_participant_user_ids, cx),
732 )
733 });
734
735 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
736 let (remote_participants, pending_participants) =
737 futures::join!(remote_participants, pending_participants);
738
739 this.update(&mut cx, |this, cx| {
740 this.participant_user_ids.clear();
741
742 if let Some(participant) = local_participant {
743 let role = participant.role();
744 this.local_participant.projects = participant.projects;
745 if this.local_participant.role != role {
746 this.local_participant.role = role;
747
748 if role == proto::ChannelRole::Guest {
749 for project in mem::take(&mut this.shared_projects) {
750 if let Some(project) = project.upgrade() {
751 this.unshare_project(project, cx).log_err();
752 }
753 }
754 this.local_participant.projects.clear();
755 if let Some(live_kit_room) = &mut this.live_kit {
756 live_kit_room.stop_publishing(cx);
757 }
758 }
759
760 this.joined_projects.retain(|project| {
761 if let Some(project) = project.upgrade() {
762 project.update(cx, |project, cx| project.set_role(role, cx));
763 true
764 } else {
765 false
766 }
767 });
768 }
769 } else {
770 this.local_participant.projects.clear();
771 }
772
773 if let Some(participants) = remote_participants.log_err() {
774 for (participant, user) in room.participants.into_iter().zip(participants) {
775 let Some(peer_id) = participant.peer_id else {
776 continue;
777 };
778 let participant_index = ParticipantIndex(participant.participant_index);
779 this.participant_user_ids.insert(participant.user_id);
780
781 let old_projects = this
782 .remote_participants
783 .get(&participant.user_id)
784 .into_iter()
785 .flat_map(|existing| &existing.projects)
786 .map(|project| project.id)
787 .collect::<HashSet<_>>();
788 let new_projects = participant
789 .projects
790 .iter()
791 .map(|project| project.id)
792 .collect::<HashSet<_>>();
793
794 for project in &participant.projects {
795 if !old_projects.contains(&project.id) {
796 cx.emit(Event::RemoteProjectShared {
797 owner: user.clone(),
798 project_id: project.id,
799 worktree_root_names: project.worktree_root_names.clone(),
800 });
801 }
802 }
803
804 for unshared_project_id in old_projects.difference(&new_projects) {
805 this.joined_projects.retain(|project| {
806 if let Some(project) = project.upgrade() {
807 project.update(cx, |project, cx| {
808 if project.remote_id() == Some(*unshared_project_id) {
809 project.disconnected_from_host(cx);
810 false
811 } else {
812 true
813 }
814 })
815 } else {
816 false
817 }
818 });
819 cx.emit(Event::RemoteProjectUnshared {
820 project_id: *unshared_project_id,
821 });
822 }
823
824 let role = participant.role();
825 let location = ParticipantLocation::from_proto(participant.location)
826 .unwrap_or(ParticipantLocation::External);
827 if let Some(remote_participant) =
828 this.remote_participants.get_mut(&participant.user_id)
829 {
830 remote_participant.peer_id = peer_id;
831 remote_participant.projects = participant.projects;
832 remote_participant.participant_index = participant_index;
833 if location != remote_participant.location
834 || role != remote_participant.role
835 {
836 remote_participant.location = location;
837 remote_participant.role = role;
838 cx.emit(Event::ParticipantLocationChanged {
839 participant_id: peer_id,
840 });
841 }
842 } else {
843 this.remote_participants.insert(
844 participant.user_id,
845 RemoteParticipant {
846 user: user.clone(),
847 participant_index,
848 peer_id,
849 projects: participant.projects,
850 location,
851 role,
852 muted: true,
853 speaking: false,
854 video_tracks: Default::default(),
855 audio_tracks: Default::default(),
856 },
857 );
858
859 Audio::play_sound(Sound::Joined, cx);
860
861 if let Some(live_kit) = this.live_kit.as_ref() {
862 let video_tracks =
863 live_kit.room.remote_video_tracks(&user.id.to_string());
864 let audio_tracks =
865 live_kit.room.remote_audio_tracks(&user.id.to_string());
866 let publications = live_kit
867 .room
868 .remote_audio_track_publications(&user.id.to_string());
869
870 for track in video_tracks {
871 this.live_kit_room_updated(
872 RoomUpdate::SubscribedToRemoteVideoTrack(track),
873 cx,
874 )
875 .log_err();
876 }
877
878 for (track, publication) in
879 audio_tracks.iter().zip(publications.iter())
880 {
881 this.live_kit_room_updated(
882 RoomUpdate::SubscribedToRemoteAudioTrack(
883 track.clone(),
884 publication.clone(),
885 ),
886 cx,
887 )
888 .log_err();
889 }
890 }
891 }
892 }
893
894 this.remote_participants.retain(|user_id, participant| {
895 if this.participant_user_ids.contains(user_id) {
896 true
897 } else {
898 for project in &participant.projects {
899 cx.emit(Event::RemoteProjectUnshared {
900 project_id: project.id,
901 });
902 }
903 false
904 }
905 });
906 }
907
908 if let Some(pending_participants) = pending_participants.log_err() {
909 this.pending_participants = pending_participants;
910 for participant in &this.pending_participants {
911 this.participant_user_ids.insert(participant.id);
912 }
913 }
914
915 this.follows_by_leader_id_project_id.clear();
916 for follower in room.followers {
917 let project_id = follower.project_id;
918 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
919 (Some(leader), Some(follower)) => (leader, follower),
920
921 _ => {
922 log::error!("Follower message {follower:?} missing some state");
923 continue;
924 }
925 };
926
927 let list = this
928 .follows_by_leader_id_project_id
929 .entry((leader, project_id))
930 .or_default();
931 if !list.contains(&follower) {
932 list.push(follower);
933 }
934 }
935
936 this.pending_room_update.take();
937 if this.should_leave() {
938 log::info!("room is empty, leaving");
939 this.leave(cx).detach();
940 }
941
942 this.user_store.update(cx, |user_store, cx| {
943 let participant_indices_by_user_id = this
944 .remote_participants
945 .iter()
946 .map(|(user_id, participant)| (*user_id, participant.participant_index))
947 .collect();
948 user_store.set_participant_indices(participant_indices_by_user_id, cx);
949 });
950
951 this.check_invariants();
952 this.room_update_completed_tx.try_send(Some(())).ok();
953 cx.notify();
954 })
955 .ok();
956 }));
957
958 cx.notify();
959 Ok(())
960 }
961
962 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
963 let mut done_rx = self.room_update_completed_rx.clone();
964 async move {
965 while let Some(result) = done_rx.next().await {
966 if result.is_some() {
967 break;
968 }
969 }
970 }
971 }
972
973 fn live_kit_room_updated(&mut self, update: RoomUpdate, cx: &mut Context<Self>) -> Result<()> {
974 match update {
975 RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
976 let user_id = track.publisher_id().parse()?;
977 let track_id = track.sid().to_string();
978 let participant = self
979 .remote_participants
980 .get_mut(&user_id)
981 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
982 participant.video_tracks.insert(track_id.clone(), track);
983 cx.emit(Event::RemoteVideoTracksChanged {
984 participant_id: participant.peer_id,
985 });
986 }
987
988 RoomUpdate::UnsubscribedFromRemoteVideoTrack {
989 publisher_id,
990 track_id,
991 } => {
992 let user_id = publisher_id.parse()?;
993 let participant = self
994 .remote_participants
995 .get_mut(&user_id)
996 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
997 participant.video_tracks.remove(&track_id);
998 cx.emit(Event::RemoteVideoTracksChanged {
999 participant_id: participant.peer_id,
1000 });
1001 }
1002
1003 RoomUpdate::ActiveSpeakersChanged { speakers } => {
1004 let mut speaker_ids = speakers
1005 .into_iter()
1006 .filter_map(|speaker_sid| speaker_sid.parse().ok())
1007 .collect::<Vec<u64>>();
1008 speaker_ids.sort_unstable();
1009 for (sid, participant) in &mut self.remote_participants {
1010 participant.speaking = speaker_ids.binary_search(sid).is_ok();
1011 }
1012 if let Some(id) = self.client.user_id() {
1013 if let Some(room) = &mut self.live_kit {
1014 room.speaking = speaker_ids.binary_search(&id).is_ok();
1015 }
1016 }
1017 }
1018
1019 RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
1020 let mut found = false;
1021 for participant in &mut self.remote_participants.values_mut() {
1022 for track in participant.audio_tracks.values() {
1023 if track.sid() == track_id {
1024 found = true;
1025 break;
1026 }
1027 }
1028 if found {
1029 participant.muted = muted;
1030 break;
1031 }
1032 }
1033 }
1034
1035 RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
1036 if let Some(live_kit) = &self.live_kit {
1037 if live_kit.deafened {
1038 track.stop();
1039 cx.foreground_executor()
1040 .spawn(publication.set_enabled(false))
1041 .detach();
1042 }
1043 }
1044
1045 let user_id = track.publisher_id().parse()?;
1046 let track_id = track.sid().to_string();
1047 let participant = self
1048 .remote_participants
1049 .get_mut(&user_id)
1050 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1051 participant.audio_tracks.insert(track_id.clone(), track);
1052 participant.muted = publication.is_muted();
1053
1054 cx.emit(Event::RemoteAudioTracksChanged {
1055 participant_id: participant.peer_id,
1056 });
1057 }
1058
1059 RoomUpdate::UnsubscribedFromRemoteAudioTrack {
1060 publisher_id,
1061 track_id,
1062 } => {
1063 let user_id = publisher_id.parse()?;
1064 let participant = self
1065 .remote_participants
1066 .get_mut(&user_id)
1067 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1068 participant.audio_tracks.remove(&track_id);
1069 cx.emit(Event::RemoteAudioTracksChanged {
1070 participant_id: participant.peer_id,
1071 });
1072 }
1073
1074 RoomUpdate::LocalAudioTrackUnpublished { publication } => {
1075 log::info!("unpublished audio track {}", publication.sid());
1076 if let Some(room) = &mut self.live_kit {
1077 room.microphone_track = LocalTrack::None;
1078 }
1079 }
1080
1081 RoomUpdate::LocalVideoTrackUnpublished { publication } => {
1082 log::info!("unpublished video track {}", publication.sid());
1083 if let Some(room) = &mut self.live_kit {
1084 room.screen_track = LocalTrack::None;
1085 }
1086 }
1087
1088 RoomUpdate::LocalAudioTrackPublished { publication } => {
1089 log::info!("published audio track {}", publication.sid());
1090 }
1091
1092 RoomUpdate::LocalVideoTrackPublished { publication } => {
1093 log::info!("published video track {}", publication.sid());
1094 }
1095 }
1096
1097 cx.notify();
1098 Ok(())
1099 }
1100
1101 fn check_invariants(&self) {
1102 #[cfg(any(test, feature = "test-support"))]
1103 {
1104 for participant in self.remote_participants.values() {
1105 assert!(self.participant_user_ids.contains(&participant.user.id));
1106 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1107 }
1108
1109 for participant in &self.pending_participants {
1110 assert!(self.participant_user_ids.contains(&participant.id));
1111 assert_ne!(participant.id, self.client.user_id().unwrap());
1112 }
1113
1114 assert_eq!(
1115 self.participant_user_ids.len(),
1116 self.remote_participants.len() + self.pending_participants.len()
1117 );
1118 }
1119 }
1120
1121 pub(crate) fn call(
1122 &mut self,
1123 called_user_id: u64,
1124 initial_project_id: Option<u64>,
1125 cx: &mut Context<Self>,
1126 ) -> Task<Result<()>> {
1127 if self.status.is_offline() {
1128 return Task::ready(Err(anyhow!("room is offline")));
1129 }
1130
1131 cx.notify();
1132 let client = self.client.clone();
1133 let room_id = self.id;
1134 self.pending_call_count += 1;
1135 cx.spawn(move |this, mut cx| async move {
1136 let result = client
1137 .request(proto::Call {
1138 room_id,
1139 called_user_id,
1140 initial_project_id,
1141 })
1142 .await;
1143 this.update(&mut cx, |this, cx| {
1144 this.pending_call_count -= 1;
1145 if this.should_leave() {
1146 this.leave(cx).detach_and_log_err(cx);
1147 }
1148 })?;
1149 result?;
1150 Ok(())
1151 })
1152 }
1153
1154 pub fn join_project(
1155 &mut self,
1156 id: u64,
1157 language_registry: Arc<LanguageRegistry>,
1158 fs: Arc<dyn Fs>,
1159 cx: &mut Context<Self>,
1160 ) -> Task<Result<Entity<Project>>> {
1161 let client = self.client.clone();
1162 let user_store = self.user_store.clone();
1163 cx.emit(Event::RemoteProjectJoined { project_id: id });
1164 cx.spawn(move |this, mut cx| async move {
1165 let project =
1166 Project::in_room(id, client, user_store, language_registry, fs, cx.clone()).await?;
1167
1168 this.update(&mut cx, |this, cx| {
1169 this.joined_projects.retain(|project| {
1170 if let Some(project) = project.upgrade() {
1171 !project.read(cx).is_disconnected(cx)
1172 } else {
1173 false
1174 }
1175 });
1176 this.joined_projects.insert(project.downgrade());
1177 })?;
1178 Ok(project)
1179 })
1180 }
1181
1182 pub fn share_project(
1183 &mut self,
1184 project: Entity<Project>,
1185 cx: &mut Context<Self>,
1186 ) -> Task<Result<u64>> {
1187 if let Some(project_id) = project.read(cx).remote_id() {
1188 return Task::ready(Ok(project_id));
1189 }
1190
1191 let request = self.client.request(proto::ShareProject {
1192 room_id: self.id(),
1193 worktrees: project.read(cx).worktree_metadata_protos(cx),
1194 is_ssh_project: project.read(cx).is_via_ssh(),
1195 });
1196
1197 cx.spawn(|this, mut cx| async move {
1198 let response = request.await?;
1199
1200 project.update(&mut cx, |project, cx| {
1201 project.shared(response.project_id, cx)
1202 })??;
1203
1204 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1205 this.update(&mut cx, |this, cx| {
1206 this.shared_projects.insert(project.downgrade());
1207 let active_project = this.local_participant.active_project.as_ref();
1208 if active_project.map_or(false, |location| *location == project) {
1209 this.set_location(Some(&project), cx)
1210 } else {
1211 Task::ready(Ok(()))
1212 }
1213 })?
1214 .await?;
1215
1216 Ok(response.project_id)
1217 })
1218 }
1219
1220 pub(crate) fn unshare_project(
1221 &mut self,
1222 project: Entity<Project>,
1223 cx: &mut Context<Self>,
1224 ) -> Result<()> {
1225 let project_id = match project.read(cx).remote_id() {
1226 Some(project_id) => project_id,
1227 None => return Ok(()),
1228 };
1229
1230 self.client.send(proto::UnshareProject { project_id })?;
1231 project.update(cx, |this, cx| this.unshare(cx))?;
1232
1233 if self.local_participant.active_project == Some(project.downgrade()) {
1234 self.set_location(Some(&project), cx).detach_and_log_err(cx);
1235 }
1236 Ok(())
1237 }
1238
1239 pub(crate) fn set_location(
1240 &mut self,
1241 project: Option<&Entity<Project>>,
1242 cx: &mut Context<Self>,
1243 ) -> Task<Result<()>> {
1244 if self.status.is_offline() {
1245 return Task::ready(Err(anyhow!("room is offline")));
1246 }
1247
1248 let client = self.client.clone();
1249 let room_id = self.id;
1250 let location = if let Some(project) = project {
1251 self.local_participant.active_project = Some(project.downgrade());
1252 if let Some(project_id) = project.read(cx).remote_id() {
1253 proto::participant_location::Variant::SharedProject(
1254 proto::participant_location::SharedProject { id: project_id },
1255 )
1256 } else {
1257 proto::participant_location::Variant::UnsharedProject(
1258 proto::participant_location::UnsharedProject {},
1259 )
1260 }
1261 } else {
1262 self.local_participant.active_project = None;
1263 proto::participant_location::Variant::External(proto::participant_location::External {})
1264 };
1265
1266 cx.notify();
1267 cx.background_executor().spawn(async move {
1268 client
1269 .request(proto::UpdateParticipantLocation {
1270 room_id,
1271 location: Some(proto::ParticipantLocation {
1272 variant: Some(location),
1273 }),
1274 })
1275 .await?;
1276 Ok(())
1277 })
1278 }
1279
1280 pub fn is_screen_sharing(&self) -> bool {
1281 self.live_kit.as_ref().map_or(false, |live_kit| {
1282 !matches!(live_kit.screen_track, LocalTrack::None)
1283 })
1284 }
1285
1286 pub fn is_sharing_mic(&self) -> bool {
1287 self.live_kit.as_ref().map_or(false, |live_kit| {
1288 !matches!(live_kit.microphone_track, LocalTrack::None)
1289 })
1290 }
1291
1292 pub fn is_muted(&self) -> bool {
1293 self.live_kit.as_ref().map_or(false, |live_kit| {
1294 matches!(live_kit.microphone_track, LocalTrack::None)
1295 || live_kit.muted_by_user
1296 || live_kit.deafened
1297 })
1298 }
1299
1300 pub fn muted_by_user(&self) -> bool {
1301 self.live_kit
1302 .as_ref()
1303 .map_or(false, |live_kit| live_kit.muted_by_user)
1304 }
1305
1306 pub fn is_speaking(&self) -> bool {
1307 self.live_kit
1308 .as_ref()
1309 .map_or(false, |live_kit| live_kit.speaking)
1310 }
1311
1312 pub fn is_deafened(&self) -> Option<bool> {
1313 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1314 }
1315
1316 pub fn can_use_microphone(&self) -> bool {
1317 use proto::ChannelRole::*;
1318 match self.local_participant.role {
1319 Admin | Member | Talker => true,
1320 Guest | Banned => false,
1321 }
1322 }
1323
1324 pub fn can_share_projects(&self) -> bool {
1325 use proto::ChannelRole::*;
1326 match self.local_participant.role {
1327 Admin | Member => true,
1328 Guest | Banned | Talker => false,
1329 }
1330 }
1331
1332 #[track_caller]
1333 pub fn share_microphone(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1334 if self.status.is_offline() {
1335 return Task::ready(Err(anyhow!("room is offline")));
1336 }
1337
1338 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1339 let publish_id = post_inc(&mut live_kit.next_publish_id);
1340 live_kit.microphone_track = LocalTrack::Pending { publish_id };
1341 cx.notify();
1342 publish_id
1343 } else {
1344 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1345 };
1346
1347 cx.spawn(move |this, mut cx| async move {
1348 let publish_track = async {
1349 let track = LocalAudioTrack::create();
1350 this.upgrade()
1351 .ok_or_else(|| anyhow!("room was dropped"))?
1352 .update(&mut cx, |this, _| {
1353 this.live_kit
1354 .as_ref()
1355 .map(|live_kit| live_kit.room.publish_audio_track(track))
1356 })?
1357 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1358 .await
1359 };
1360 let publication = publish_track.await;
1361 this.upgrade()
1362 .ok_or_else(|| anyhow!("room was dropped"))?
1363 .update(&mut cx, |this, cx| {
1364 let live_kit = this
1365 .live_kit
1366 .as_mut()
1367 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1368
1369 let canceled = if let LocalTrack::Pending {
1370 publish_id: cur_publish_id,
1371 } = &live_kit.microphone_track
1372 {
1373 *cur_publish_id != publish_id
1374 } else {
1375 true
1376 };
1377
1378 match publication {
1379 Ok(publication) => {
1380 if canceled {
1381 live_kit.room.unpublish_track(publication);
1382 } else {
1383 if live_kit.muted_by_user || live_kit.deafened {
1384 cx.background_executor()
1385 .spawn(publication.set_mute(true))
1386 .detach();
1387 }
1388 live_kit.microphone_track = LocalTrack::Published {
1389 track_publication: publication,
1390 };
1391 cx.notify();
1392 }
1393 Ok(())
1394 }
1395 Err(error) => {
1396 if canceled {
1397 Ok(())
1398 } else {
1399 live_kit.microphone_track = LocalTrack::None;
1400 cx.notify();
1401 Err(error)
1402 }
1403 }
1404 }
1405 })?
1406 })
1407 }
1408
1409 pub fn share_screen(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1410 if self.status.is_offline() {
1411 return Task::ready(Err(anyhow!("room is offline")));
1412 } else if self.is_screen_sharing() {
1413 return Task::ready(Err(anyhow!("screen was already shared")));
1414 }
1415
1416 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1417 let publish_id = post_inc(&mut live_kit.next_publish_id);
1418 live_kit.screen_track = LocalTrack::Pending { publish_id };
1419 cx.notify();
1420 (live_kit.room.display_sources(), publish_id)
1421 } else {
1422 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1423 };
1424
1425 cx.spawn(move |this, mut cx| async move {
1426 let publish_track = async {
1427 let displays = displays.await?;
1428 let display = displays
1429 .first()
1430 .ok_or_else(|| anyhow!("no display found"))?;
1431 let track = LocalVideoTrack::screen_share_for_display(display);
1432 this.upgrade()
1433 .ok_or_else(|| anyhow!("room was dropped"))?
1434 .update(&mut cx, |this, _| {
1435 this.live_kit
1436 .as_ref()
1437 .map(|live_kit| live_kit.room.publish_video_track(track))
1438 })?
1439 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1440 .await
1441 };
1442
1443 let publication = publish_track.await;
1444 this.upgrade()
1445 .ok_or_else(|| anyhow!("room was dropped"))?
1446 .update(&mut cx, |this, cx| {
1447 let live_kit = this
1448 .live_kit
1449 .as_mut()
1450 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1451
1452 let canceled = if let LocalTrack::Pending {
1453 publish_id: cur_publish_id,
1454 } = &live_kit.screen_track
1455 {
1456 *cur_publish_id != publish_id
1457 } else {
1458 true
1459 };
1460
1461 match publication {
1462 Ok(publication) => {
1463 if canceled {
1464 live_kit.room.unpublish_track(publication);
1465 } else {
1466 live_kit.screen_track = LocalTrack::Published {
1467 track_publication: publication,
1468 };
1469 cx.notify();
1470 }
1471
1472 Audio::play_sound(Sound::StartScreenshare, cx);
1473
1474 Ok(())
1475 }
1476 Err(error) => {
1477 if canceled {
1478 Ok(())
1479 } else {
1480 live_kit.screen_track = LocalTrack::None;
1481 cx.notify();
1482 Err(error)
1483 }
1484 }
1485 }
1486 })?
1487 })
1488 }
1489
1490 pub fn toggle_mute(&mut self, cx: &mut Context<Self>) {
1491 if let Some(live_kit) = self.live_kit.as_mut() {
1492 // When unmuting, undeafen if the user was deafened before.
1493 let was_deafened = live_kit.deafened;
1494 if live_kit.muted_by_user
1495 || live_kit.deafened
1496 || matches!(live_kit.microphone_track, LocalTrack::None)
1497 {
1498 live_kit.muted_by_user = false;
1499 live_kit.deafened = false;
1500 } else {
1501 live_kit.muted_by_user = true;
1502 }
1503 let muted = live_kit.muted_by_user;
1504 let should_undeafen = was_deafened && !live_kit.deafened;
1505
1506 if let Some(task) = self.set_mute(muted, cx) {
1507 task.detach_and_log_err(cx);
1508 }
1509
1510 if should_undeafen {
1511 if let Some(task) = self.set_deafened(false, cx) {
1512 task.detach_and_log_err(cx);
1513 }
1514 }
1515 }
1516 }
1517
1518 pub fn toggle_deafen(&mut self, cx: &mut Context<Self>) {
1519 if let Some(live_kit) = self.live_kit.as_mut() {
1520 // When deafening, mute the microphone if it was not already muted.
1521 // When un-deafening, unmute the microphone, unless it was explicitly muted.
1522 let deafened = !live_kit.deafened;
1523 live_kit.deafened = deafened;
1524 let should_change_mute = !live_kit.muted_by_user;
1525
1526 if let Some(task) = self.set_deafened(deafened, cx) {
1527 task.detach_and_log_err(cx);
1528 }
1529
1530 if should_change_mute {
1531 if let Some(task) = self.set_mute(deafened, cx) {
1532 task.detach_and_log_err(cx);
1533 }
1534 }
1535 }
1536 }
1537
1538 pub fn unshare_screen(&mut self, cx: &mut Context<Self>) -> Result<()> {
1539 if self.status.is_offline() {
1540 return Err(anyhow!("room is offline"));
1541 }
1542
1543 let live_kit = self
1544 .live_kit
1545 .as_mut()
1546 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1547 match mem::take(&mut live_kit.screen_track) {
1548 LocalTrack::None => Err(anyhow!("screen was not shared")),
1549 LocalTrack::Pending { .. } => {
1550 cx.notify();
1551 Ok(())
1552 }
1553 LocalTrack::Published {
1554 track_publication, ..
1555 } => {
1556 live_kit.room.unpublish_track(track_publication);
1557 cx.notify();
1558
1559 Audio::play_sound(Sound::StopScreenshare, cx);
1560 Ok(())
1561 }
1562 }
1563 }
1564
1565 fn set_deafened(&mut self, deafened: bool, cx: &mut Context<Self>) -> Option<Task<Result<()>>> {
1566 let live_kit = self.live_kit.as_mut()?;
1567 cx.notify();
1568
1569 let mut track_updates = Vec::new();
1570 for participant in self.remote_participants.values() {
1571 for publication in live_kit
1572 .room
1573 .remote_audio_track_publications(&participant.user.id.to_string())
1574 {
1575 track_updates.push(publication.set_enabled(!deafened));
1576 }
1577
1578 for track in participant.audio_tracks.values() {
1579 if deafened {
1580 track.stop();
1581 } else {
1582 track.start();
1583 }
1584 }
1585 }
1586
1587 Some(cx.foreground_executor().spawn(async move {
1588 for result in futures::future::join_all(track_updates).await {
1589 result?;
1590 }
1591 Ok(())
1592 }))
1593 }
1594
1595 fn set_mute(&mut self, should_mute: bool, cx: &mut Context<Room>) -> Option<Task<Result<()>>> {
1596 let live_kit = self.live_kit.as_mut()?;
1597 cx.notify();
1598
1599 if should_mute {
1600 Audio::play_sound(Sound::Mute, cx);
1601 } else {
1602 Audio::play_sound(Sound::Unmute, cx);
1603 }
1604
1605 match &mut live_kit.microphone_track {
1606 LocalTrack::None => {
1607 if should_mute {
1608 None
1609 } else {
1610 Some(self.share_microphone(cx))
1611 }
1612 }
1613 LocalTrack::Pending { .. } => None,
1614 LocalTrack::Published { track_publication } => Some(
1615 cx.foreground_executor()
1616 .spawn(track_publication.set_mute(should_mute)),
1617 ),
1618 }
1619 }
1620
1621 #[cfg(any(test, feature = "test-support"))]
1622 pub fn set_display_sources(&self, sources: Vec<livekit_client_macos::MacOSDisplay>) {
1623 self.live_kit
1624 .as_ref()
1625 .unwrap()
1626 .room
1627 .set_display_sources(sources);
1628 }
1629}
1630
1631struct LiveKitRoom {
1632 room: Arc<livekit_client_macos::Room>,
1633 screen_track: LocalTrack,
1634 microphone_track: LocalTrack,
1635 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1636 muted_by_user: bool,
1637 deafened: bool,
1638 speaking: bool,
1639 next_publish_id: usize,
1640 _maintain_room: Task<()>,
1641 _handle_updates: Task<()>,
1642}
1643
1644impl LiveKitRoom {
1645 fn stop_publishing(&mut self, cx: &mut Context<Room>) {
1646 if let LocalTrack::Published {
1647 track_publication, ..
1648 } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1649 {
1650 self.room.unpublish_track(track_publication);
1651 cx.notify();
1652 }
1653
1654 if let LocalTrack::Published {
1655 track_publication, ..
1656 } = mem::replace(&mut self.screen_track, LocalTrack::None)
1657 {
1658 self.room.unpublish_track(track_publication);
1659 cx.notify();
1660 }
1661 }
1662}
1663
1664enum LocalTrack {
1665 None,
1666 Pending {
1667 publish_id: usize,
1668 },
1669 Published {
1670 track_publication: LocalTrackPublication,
1671 },
1672}
1673
1674impl Default for LocalTrack {
1675 fn default() -> Self {
1676 Self::None
1677 }
1678}
1679
1680#[derive(Copy, Clone, PartialEq, Eq)]
1681pub enum RoomStatus {
1682 Online,
1683 Rejoining,
1684 Offline,
1685}
1686
1687impl RoomStatus {
1688 pub fn is_offline(&self) -> bool {
1689 matches!(self, RoomStatus::Offline)
1690 }
1691
1692 pub fn is_online(&self) -> bool {
1693 matches!(self, RoomStatus::Online)
1694 }
1695}