1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
4};
5use anyhow::{anyhow, Result};
6use audio::{Audio, Sound};
7use client::{
8 proto::{self, PeerId},
9 ChannelId, Client, ParticipantIndex, TypedEnvelope, User, UserStore,
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use fs::Fs;
13use futures::{FutureExt, StreamExt};
14use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
15use language::LanguageRegistry;
16use livekit_client_macos::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
17use postage::{sink::Sink, stream::Stream, watch};
18use project::Project;
19use settings::Settings as _;
20use std::{future::Future, mem, sync::Arc, time::Duration};
21use util::{post_inc, ResultExt, TryFutureExt};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub enum Event {
27 RoomJoined {
28 channel_id: Option<ChannelId>,
29 },
30 ParticipantLocationChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteVideoTracksChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteAudioTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteProjectShared {
40 owner: Arc<User>,
41 project_id: u64,
42 worktree_root_names: Vec<String>,
43 },
44 RemoteProjectUnshared {
45 project_id: u64,
46 },
47 RemoteProjectJoined {
48 project_id: u64,
49 },
50 RemoteProjectInvitationDiscarded {
51 project_id: u64,
52 },
53 RoomLeft {
54 channel_id: Option<ChannelId>,
55 },
56}
57
58pub struct Room {
59 id: u64,
60 channel_id: Option<ChannelId>,
61 live_kit: Option<LiveKitRoom>,
62 status: RoomStatus,
63 shared_projects: HashSet<WeakEntity<Project>>,
64 joined_projects: HashSet<WeakEntity<Project>>,
65 local_participant: LocalParticipant,
66 remote_participants: BTreeMap<u64, RemoteParticipant>,
67 pending_participants: Vec<Arc<User>>,
68 participant_user_ids: HashSet<u64>,
69 pending_call_count: usize,
70 leave_when_empty: bool,
71 client: Arc<Client>,
72 user_store: Entity<UserStore>,
73 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
74 client_subscriptions: Vec<client::Subscription>,
75 _subscriptions: Vec<gpui::Subscription>,
76 room_update_completed_tx: watch::Sender<Option<()>>,
77 room_update_completed_rx: watch::Receiver<Option<()>>,
78 pending_room_update: Option<Task<()>>,
79 maintain_connection: Option<Task<Option<()>>>,
80}
81
82impl EventEmitter<Event> for Room {}
83
84impl Room {
85 pub fn channel_id(&self) -> Option<ChannelId> {
86 self.channel_id
87 }
88
89 pub fn is_sharing_project(&self) -> bool {
90 !self.shared_projects.is_empty()
91 }
92
93 #[cfg(any(test, feature = "test-support"))]
94 pub fn is_connected(&self) -> bool {
95 if let Some(live_kit) = self.live_kit.as_ref() {
96 matches!(
97 *live_kit.room.status().borrow(),
98 livekit_client_macos::ConnectionState::Connected { .. }
99 )
100 } else {
101 false
102 }
103 }
104
105 fn new(
106 id: u64,
107 channel_id: Option<ChannelId>,
108 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
109 client: Arc<Client>,
110 user_store: Entity<UserStore>,
111 cx: &mut Context<Self>,
112 ) -> Self {
113 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
114 let room = livekit_client_macos::Room::new();
115 let mut status = room.status();
116 // Consume the initial status of the room.
117 let _ = status.try_recv();
118 let _maintain_room = cx.spawn(|this, mut cx| async move {
119 while let Some(status) = status.next().await {
120 let this = if let Some(this) = this.upgrade() {
121 this
122 } else {
123 break;
124 };
125
126 if status == livekit_client_macos::ConnectionState::Disconnected {
127 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
128 .ok();
129 break;
130 }
131 }
132 });
133
134 let _handle_updates = cx.spawn({
135 let room = room.clone();
136 move |this, mut cx| async move {
137 let mut updates = room.updates();
138 while let Some(update) = updates.next().await {
139 let this = if let Some(this) = this.upgrade() {
140 this
141 } else {
142 break;
143 };
144
145 this.update(&mut cx, |this, cx| {
146 this.live_kit_room_updated(update, cx).log_err()
147 })
148 .ok();
149 }
150 }
151 });
152
153 let connect = room.connect(&connection_info.server_url, &connection_info.token);
154 cx.spawn(|this, mut cx| async move {
155 connect.await?;
156 this.update(&mut cx, |this, cx| {
157 if this.can_use_microphone() {
158 if let Some(live_kit) = &this.live_kit {
159 if !live_kit.muted_by_user && !live_kit.deafened {
160 return this.share_microphone(cx);
161 }
162 }
163 }
164 Task::ready(Ok(()))
165 })?
166 .await
167 })
168 .detach_and_log_err(cx);
169
170 Some(LiveKitRoom {
171 room,
172 screen_track: LocalTrack::None,
173 microphone_track: LocalTrack::None,
174 next_publish_id: 0,
175 muted_by_user: Self::mute_on_join(cx),
176 deafened: false,
177 speaking: false,
178 _maintain_room,
179 _handle_updates,
180 })
181 } else {
182 None
183 };
184
185 let maintain_connection = cx.spawn({
186 let client = client.clone();
187 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
188 });
189
190 Audio::play_sound(Sound::Joined, cx);
191
192 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
193
194 Self {
195 id,
196 channel_id,
197 live_kit: live_kit_room,
198 status: RoomStatus::Online,
199 shared_projects: Default::default(),
200 joined_projects: Default::default(),
201 participant_user_ids: Default::default(),
202 local_participant: Default::default(),
203 remote_participants: Default::default(),
204 pending_participants: Default::default(),
205 pending_call_count: 0,
206 client_subscriptions: vec![
207 client.add_message_handler(cx.weak_entity(), Self::handle_room_updated)
208 ],
209 _subscriptions: vec![
210 cx.on_release(Self::released),
211 cx.on_app_quit(Self::app_will_quit),
212 ],
213 leave_when_empty: false,
214 pending_room_update: None,
215 client,
216 user_store,
217 follows_by_leader_id_project_id: Default::default(),
218 maintain_connection: Some(maintain_connection),
219 room_update_completed_tx,
220 room_update_completed_rx,
221 }
222 }
223
224 pub(crate) fn create(
225 called_user_id: u64,
226 initial_project: Option<Entity<Project>>,
227 client: Arc<Client>,
228 user_store: Entity<UserStore>,
229 cx: &mut App,
230 ) -> Task<Result<Entity<Self>>> {
231 cx.spawn(move |mut cx| async move {
232 let response = client.request(proto::CreateRoom {}).await?;
233 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
234 let room = cx.new(|cx| {
235 let mut room = Self::new(
236 room_proto.id,
237 None,
238 response.live_kit_connection_info,
239 client,
240 user_store,
241 cx,
242 );
243 if let Some(participant) = room_proto.participants.first() {
244 room.local_participant.role = participant.role()
245 }
246 room
247 })?;
248
249 let initial_project_id = if let Some(initial_project) = initial_project {
250 let initial_project_id = room
251 .update(&mut cx, |room, cx| {
252 room.share_project(initial_project.clone(), cx)
253 })?
254 .await?;
255 Some(initial_project_id)
256 } else {
257 None
258 };
259
260 let did_join = room
261 .update(&mut cx, |room, cx| {
262 room.leave_when_empty = true;
263 room.call(called_user_id, initial_project_id, cx)
264 })?
265 .await;
266 match did_join {
267 Ok(()) => Ok(room),
268 Err(error) => Err(error.context("room creation failed")),
269 }
270 })
271 }
272
273 pub(crate) async fn join_channel(
274 channel_id: ChannelId,
275 client: Arc<Client>,
276 user_store: Entity<UserStore>,
277 cx: AsyncApp,
278 ) -> Result<Entity<Self>> {
279 Self::from_join_response(
280 client
281 .request(proto::JoinChannel {
282 channel_id: channel_id.0,
283 })
284 .await?,
285 client,
286 user_store,
287 cx,
288 )
289 }
290
291 pub(crate) async fn join(
292 room_id: u64,
293 client: Arc<Client>,
294 user_store: Entity<UserStore>,
295 cx: AsyncApp,
296 ) -> Result<Entity<Self>> {
297 Self::from_join_response(
298 client.request(proto::JoinRoom { id: room_id }).await?,
299 client,
300 user_store,
301 cx,
302 )
303 }
304
305 fn released(&mut self, cx: &mut App) {
306 if self.status.is_online() {
307 self.leave_internal(cx).detach_and_log_err(cx);
308 }
309 }
310
311 fn app_will_quit(&mut self, cx: &mut Context<Self>) -> impl Future<Output = ()> {
312 let task = if self.status.is_online() {
313 let leave = self.leave_internal(cx);
314 Some(cx.background_spawn(async move {
315 leave.await.log_err();
316 }))
317 } else {
318 None
319 };
320
321 async move {
322 if let Some(task) = task {
323 task.await;
324 }
325 }
326 }
327
328 pub fn mute_on_join(cx: &App) -> bool {
329 CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
330 }
331
332 fn from_join_response(
333 response: proto::JoinRoomResponse,
334 client: Arc<Client>,
335 user_store: Entity<UserStore>,
336 mut cx: AsyncApp,
337 ) -> Result<Entity<Self>> {
338 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
339 let room = cx.new(|cx| {
340 Self::new(
341 room_proto.id,
342 response.channel_id.map(ChannelId),
343 response.live_kit_connection_info,
344 client,
345 user_store,
346 cx,
347 )
348 })?;
349 room.update(&mut cx, |room, cx| {
350 room.leave_when_empty = room.channel_id.is_none();
351 room.apply_room_update(room_proto, cx)?;
352 anyhow::Ok(())
353 })??;
354 Ok(room)
355 }
356
357 fn should_leave(&self) -> bool {
358 self.leave_when_empty
359 && self.pending_room_update.is_none()
360 && self.pending_participants.is_empty()
361 && self.remote_participants.is_empty()
362 && self.pending_call_count == 0
363 }
364
365 pub(crate) fn leave(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
366 cx.notify();
367 self.leave_internal(cx)
368 }
369
370 fn leave_internal(&mut self, cx: &mut App) -> Task<Result<()>> {
371 if self.status.is_offline() {
372 return Task::ready(Err(anyhow!("room is offline")));
373 }
374
375 log::info!("leaving room");
376 Audio::play_sound(Sound::Leave, cx);
377
378 self.clear_state(cx);
379
380 let leave_room = self.client.request(proto::LeaveRoom {});
381 cx.background_spawn(async move {
382 leave_room.await?;
383 anyhow::Ok(())
384 })
385 }
386
387 pub(crate) fn clear_state(&mut self, cx: &mut App) {
388 for project in self.shared_projects.drain() {
389 if let Some(project) = project.upgrade() {
390 project.update(cx, |project, cx| {
391 project.unshare(cx).log_err();
392 });
393 }
394 }
395 for project in self.joined_projects.drain() {
396 if let Some(project) = project.upgrade() {
397 project.update(cx, |project, cx| {
398 project.disconnected_from_host(cx);
399 project.close(cx);
400 });
401 }
402 }
403
404 self.status = RoomStatus::Offline;
405 self.remote_participants.clear();
406 self.pending_participants.clear();
407 self.participant_user_ids.clear();
408 self.client_subscriptions.clear();
409 self.live_kit.take();
410 self.pending_room_update.take();
411 self.maintain_connection.take();
412 }
413
414 async fn maintain_connection(
415 this: WeakEntity<Self>,
416 client: Arc<Client>,
417 mut cx: AsyncApp,
418 ) -> Result<()> {
419 let mut client_status = client.status();
420 loop {
421 let _ = client_status.try_recv();
422 let is_connected = client_status.borrow().is_connected();
423 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
424 if !is_connected || client_status.next().await.is_some() {
425 log::info!("detected client disconnection");
426
427 this.upgrade()
428 .ok_or_else(|| anyhow!("room was dropped"))?
429 .update(&mut cx, |this, cx| {
430 this.status = RoomStatus::Rejoining;
431 cx.notify();
432 })?;
433
434 // Wait for client to re-establish a connection to the server.
435 {
436 let mut reconnection_timeout =
437 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
438 let client_reconnection = async {
439 let mut remaining_attempts = 3;
440 while remaining_attempts > 0 {
441 if client_status.borrow().is_connected() {
442 log::info!("client reconnected, attempting to rejoin room");
443
444 let Some(this) = this.upgrade() else { break };
445 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
446 Ok(task) => {
447 if task.await.log_err().is_some() {
448 return true;
449 } else {
450 remaining_attempts -= 1;
451 }
452 }
453 Err(_app_dropped) => return false,
454 }
455 } else if client_status.borrow().is_signed_out() {
456 return false;
457 }
458
459 log::info!(
460 "waiting for client status change, remaining attempts {}",
461 remaining_attempts
462 );
463 client_status.next().await;
464 }
465 false
466 }
467 .fuse();
468 futures::pin_mut!(client_reconnection);
469
470 futures::select_biased! {
471 reconnected = client_reconnection => {
472 if reconnected {
473 log::info!("successfully reconnected to room");
474 // If we successfully joined the room, go back around the loop
475 // waiting for future connection status changes.
476 continue;
477 }
478 }
479 _ = reconnection_timeout => {
480 log::info!("room reconnection timeout expired");
481 }
482 }
483 }
484
485 break;
486 }
487 }
488
489 // The client failed to re-establish a connection to the server
490 // or an error occurred while trying to re-join the room. Either way
491 // we leave the room and return an error.
492 if let Some(this) = this.upgrade() {
493 log::info!("reconnection failed, leaving room");
494 this.update(&mut cx, |this, cx| this.leave(cx))?.await?;
495 }
496 Err(anyhow!(
497 "can't reconnect to room: client failed to re-establish connection"
498 ))
499 }
500
501 fn rejoin(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
502 let mut projects = HashMap::default();
503 let mut reshared_projects = Vec::new();
504 let mut rejoined_projects = Vec::new();
505 self.shared_projects.retain(|project| {
506 if let Some(handle) = project.upgrade() {
507 let project = handle.read(cx);
508 if let Some(project_id) = project.remote_id() {
509 projects.insert(project_id, handle.clone());
510 reshared_projects.push(proto::UpdateProject {
511 project_id,
512 worktrees: project.worktree_metadata_protos(cx),
513 });
514 return true;
515 }
516 }
517 false
518 });
519 self.joined_projects.retain(|project| {
520 if let Some(handle) = project.upgrade() {
521 let project = handle.read(cx);
522 if let Some(project_id) = project.remote_id() {
523 projects.insert(project_id, handle.clone());
524 rejoined_projects.push(proto::RejoinProject {
525 id: project_id,
526 worktrees: project
527 .worktrees(cx)
528 .map(|worktree| {
529 let worktree = worktree.read(cx);
530 proto::RejoinWorktree {
531 id: worktree.id().to_proto(),
532 scan_id: worktree.completed_scan_id() as u64,
533 }
534 })
535 .collect(),
536 });
537 }
538 return true;
539 }
540 false
541 });
542
543 let response = self.client.request_envelope(proto::RejoinRoom {
544 id: self.id,
545 reshared_projects,
546 rejoined_projects,
547 });
548
549 cx.spawn(|this, mut cx| async move {
550 let response = response.await?;
551 let message_id = response.message_id;
552 let response = response.payload;
553 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
554 this.update(&mut cx, |this, cx| {
555 this.status = RoomStatus::Online;
556 this.apply_room_update(room_proto, cx)?;
557
558 for reshared_project in response.reshared_projects {
559 if let Some(project) = projects.get(&reshared_project.id) {
560 project.update(cx, |project, cx| {
561 project.reshared(reshared_project, cx).log_err();
562 });
563 }
564 }
565
566 for rejoined_project in response.rejoined_projects {
567 if let Some(project) = projects.get(&rejoined_project.id) {
568 project.update(cx, |project, cx| {
569 project.rejoined(rejoined_project, message_id, cx).log_err();
570 });
571 }
572 }
573
574 anyhow::Ok(())
575 })?
576 })
577 }
578
579 pub fn id(&self) -> u64 {
580 self.id
581 }
582
583 pub fn status(&self) -> RoomStatus {
584 self.status
585 }
586
587 pub fn local_participant(&self) -> &LocalParticipant {
588 &self.local_participant
589 }
590
591 pub fn local_participant_user(&self, cx: &App) -> Option<Arc<User>> {
592 self.user_store.read(cx).current_user()
593 }
594
595 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
596 &self.remote_participants
597 }
598
599 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
600 self.remote_participants
601 .values()
602 .find(|p| p.peer_id == peer_id)
603 }
604
605 pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
606 self.remote_participants
607 .get(&user_id)
608 .map(|participant| participant.role)
609 }
610
611 pub fn contains_guests(&self) -> bool {
612 self.local_participant.role == proto::ChannelRole::Guest
613 || self
614 .remote_participants
615 .values()
616 .any(|p| p.role == proto::ChannelRole::Guest)
617 }
618
619 pub fn local_participant_is_admin(&self) -> bool {
620 self.local_participant.role == proto::ChannelRole::Admin
621 }
622
623 pub fn local_participant_is_guest(&self) -> bool {
624 self.local_participant.role == proto::ChannelRole::Guest
625 }
626
627 pub fn set_participant_role(
628 &mut self,
629 user_id: u64,
630 role: proto::ChannelRole,
631 cx: &Context<Self>,
632 ) -> Task<Result<()>> {
633 let client = self.client.clone();
634 let room_id = self.id;
635 let role = role.into();
636 cx.spawn(|_, _| async move {
637 client
638 .request(proto::SetRoomParticipantRole {
639 room_id,
640 user_id,
641 role,
642 })
643 .await
644 .map(|_| ())
645 })
646 }
647
648 pub fn pending_participants(&self) -> &[Arc<User>] {
649 &self.pending_participants
650 }
651
652 pub fn contains_participant(&self, user_id: u64) -> bool {
653 self.participant_user_ids.contains(&user_id)
654 }
655
656 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
657 self.follows_by_leader_id_project_id
658 .get(&(leader_id, project_id))
659 .map_or(&[], |v| v.as_slice())
660 }
661
662 /// Returns the most 'active' projects, defined as most people in the project
663 pub fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
664 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
665 for participant in self.remote_participants.values() {
666 match participant.location {
667 ParticipantLocation::SharedProject { project_id } => {
668 project_hosts_and_guest_counts
669 .entry(project_id)
670 .or_default()
671 .1 += 1;
672 }
673 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
674 }
675 for project in &participant.projects {
676 project_hosts_and_guest_counts
677 .entry(project.id)
678 .or_default()
679 .0 = Some(participant.user.id);
680 }
681 }
682
683 if let Some(user) = self.user_store.read(cx).current_user() {
684 for project in &self.local_participant.projects {
685 project_hosts_and_guest_counts
686 .entry(project.id)
687 .or_default()
688 .0 = Some(user.id);
689 }
690 }
691
692 project_hosts_and_guest_counts
693 .into_iter()
694 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
695 .max_by_key(|(_, _, guest_count)| *guest_count)
696 .map(|(id, host, _)| (id, host))
697 }
698
699 async fn handle_room_updated(
700 this: Entity<Self>,
701 envelope: TypedEnvelope<proto::RoomUpdated>,
702 mut cx: AsyncApp,
703 ) -> Result<()> {
704 let room = envelope
705 .payload
706 .room
707 .ok_or_else(|| anyhow!("invalid room"))?;
708 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
709 }
710
711 fn apply_room_update(&mut self, mut room: proto::Room, cx: &mut Context<Self>) -> Result<()> {
712 // Filter ourselves out from the room's participants.
713 let local_participant_ix = room
714 .participants
715 .iter()
716 .position(|participant| Some(participant.user_id) == self.client.user_id());
717 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
718
719 let pending_participant_user_ids = room
720 .pending_participants
721 .iter()
722 .map(|p| p.user_id)
723 .collect::<Vec<_>>();
724
725 let remote_participant_user_ids = room
726 .participants
727 .iter()
728 .map(|p| p.user_id)
729 .collect::<Vec<_>>();
730
731 let (remote_participants, pending_participants) =
732 self.user_store.update(cx, move |user_store, cx| {
733 (
734 user_store.get_users(remote_participant_user_ids, cx),
735 user_store.get_users(pending_participant_user_ids, cx),
736 )
737 });
738
739 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
740 let (remote_participants, pending_participants) =
741 futures::join!(remote_participants, pending_participants);
742
743 this.update(&mut cx, |this, cx| {
744 this.participant_user_ids.clear();
745
746 if let Some(participant) = local_participant {
747 let role = participant.role();
748 this.local_participant.projects = participant.projects;
749 if this.local_participant.role != role {
750 this.local_participant.role = role;
751
752 if role == proto::ChannelRole::Guest {
753 for project in mem::take(&mut this.shared_projects) {
754 if let Some(project) = project.upgrade() {
755 this.unshare_project(project, cx).log_err();
756 }
757 }
758 this.local_participant.projects.clear();
759 if let Some(live_kit_room) = &mut this.live_kit {
760 live_kit_room.stop_publishing(cx);
761 }
762 }
763
764 this.joined_projects.retain(|project| {
765 if let Some(project) = project.upgrade() {
766 project.update(cx, |project, cx| project.set_role(role, cx));
767 true
768 } else {
769 false
770 }
771 });
772 }
773 } else {
774 this.local_participant.projects.clear();
775 }
776
777 if let Some(participants) = remote_participants.log_err() {
778 for (participant, user) in room.participants.into_iter().zip(participants) {
779 let Some(peer_id) = participant.peer_id else {
780 continue;
781 };
782 let participant_index = ParticipantIndex(participant.participant_index);
783 this.participant_user_ids.insert(participant.user_id);
784
785 let old_projects = this
786 .remote_participants
787 .get(&participant.user_id)
788 .into_iter()
789 .flat_map(|existing| &existing.projects)
790 .map(|project| project.id)
791 .collect::<HashSet<_>>();
792 let new_projects = participant
793 .projects
794 .iter()
795 .map(|project| project.id)
796 .collect::<HashSet<_>>();
797
798 for project in &participant.projects {
799 if !old_projects.contains(&project.id) {
800 cx.emit(Event::RemoteProjectShared {
801 owner: user.clone(),
802 project_id: project.id,
803 worktree_root_names: project.worktree_root_names.clone(),
804 });
805 }
806 }
807
808 for unshared_project_id in old_projects.difference(&new_projects) {
809 this.joined_projects.retain(|project| {
810 if let Some(project) = project.upgrade() {
811 project.update(cx, |project, cx| {
812 if project.remote_id() == Some(*unshared_project_id) {
813 project.disconnected_from_host(cx);
814 false
815 } else {
816 true
817 }
818 })
819 } else {
820 false
821 }
822 });
823 cx.emit(Event::RemoteProjectUnshared {
824 project_id: *unshared_project_id,
825 });
826 }
827
828 let role = participant.role();
829 let location = ParticipantLocation::from_proto(participant.location)
830 .unwrap_or(ParticipantLocation::External);
831 if let Some(remote_participant) =
832 this.remote_participants.get_mut(&participant.user_id)
833 {
834 remote_participant.peer_id = peer_id;
835 remote_participant.projects = participant.projects;
836 remote_participant.participant_index = participant_index;
837 if location != remote_participant.location
838 || role != remote_participant.role
839 {
840 remote_participant.location = location;
841 remote_participant.role = role;
842 cx.emit(Event::ParticipantLocationChanged {
843 participant_id: peer_id,
844 });
845 }
846 } else {
847 this.remote_participants.insert(
848 participant.user_id,
849 RemoteParticipant {
850 user: user.clone(),
851 participant_index,
852 peer_id,
853 projects: participant.projects,
854 location,
855 role,
856 muted: true,
857 speaking: false,
858 video_tracks: Default::default(),
859 audio_tracks: Default::default(),
860 },
861 );
862
863 Audio::play_sound(Sound::Joined, cx);
864
865 if let Some(live_kit) = this.live_kit.as_ref() {
866 let video_tracks =
867 live_kit.room.remote_video_tracks(&user.id.to_string());
868 let audio_tracks =
869 live_kit.room.remote_audio_tracks(&user.id.to_string());
870 let publications = live_kit
871 .room
872 .remote_audio_track_publications(&user.id.to_string());
873
874 for track in video_tracks {
875 this.live_kit_room_updated(
876 RoomUpdate::SubscribedToRemoteVideoTrack(track),
877 cx,
878 )
879 .log_err();
880 }
881
882 for (track, publication) in
883 audio_tracks.iter().zip(publications.iter())
884 {
885 this.live_kit_room_updated(
886 RoomUpdate::SubscribedToRemoteAudioTrack(
887 track.clone(),
888 publication.clone(),
889 ),
890 cx,
891 )
892 .log_err();
893 }
894 }
895 }
896 }
897
898 this.remote_participants.retain(|user_id, participant| {
899 if this.participant_user_ids.contains(user_id) {
900 true
901 } else {
902 for project in &participant.projects {
903 cx.emit(Event::RemoteProjectUnshared {
904 project_id: project.id,
905 });
906 }
907 false
908 }
909 });
910 }
911
912 if let Some(pending_participants) = pending_participants.log_err() {
913 this.pending_participants = pending_participants;
914 for participant in &this.pending_participants {
915 this.participant_user_ids.insert(participant.id);
916 }
917 }
918
919 this.follows_by_leader_id_project_id.clear();
920 for follower in room.followers {
921 let project_id = follower.project_id;
922 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
923 (Some(leader), Some(follower)) => (leader, follower),
924
925 _ => {
926 log::error!("Follower message {follower:?} missing some state");
927 continue;
928 }
929 };
930
931 let list = this
932 .follows_by_leader_id_project_id
933 .entry((leader, project_id))
934 .or_default();
935 if !list.contains(&follower) {
936 list.push(follower);
937 }
938 }
939
940 this.pending_room_update.take();
941 if this.should_leave() {
942 log::info!("room is empty, leaving");
943 this.leave(cx).detach();
944 }
945
946 this.user_store.update(cx, |user_store, cx| {
947 let participant_indices_by_user_id = this
948 .remote_participants
949 .iter()
950 .map(|(user_id, participant)| (*user_id, participant.participant_index))
951 .collect();
952 user_store.set_participant_indices(participant_indices_by_user_id, cx);
953 });
954
955 this.check_invariants();
956 this.room_update_completed_tx.try_send(Some(())).ok();
957 cx.notify();
958 })
959 .ok();
960 }));
961
962 cx.notify();
963 Ok(())
964 }
965
966 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
967 let mut done_rx = self.room_update_completed_rx.clone();
968 async move {
969 while let Some(result) = done_rx.next().await {
970 if result.is_some() {
971 break;
972 }
973 }
974 }
975 }
976
977 fn live_kit_room_updated(&mut self, update: RoomUpdate, cx: &mut Context<Self>) -> Result<()> {
978 match update {
979 RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
980 let user_id = track.publisher_id().parse()?;
981 let track_id = track.sid().to_string();
982 let participant = self
983 .remote_participants
984 .get_mut(&user_id)
985 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
986 participant.video_tracks.insert(track_id.clone(), track);
987 cx.emit(Event::RemoteVideoTracksChanged {
988 participant_id: participant.peer_id,
989 });
990 }
991
992 RoomUpdate::UnsubscribedFromRemoteVideoTrack {
993 publisher_id,
994 track_id,
995 } => {
996 let user_id = publisher_id.parse()?;
997 let participant = self
998 .remote_participants
999 .get_mut(&user_id)
1000 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1001 participant.video_tracks.remove(&track_id);
1002 cx.emit(Event::RemoteVideoTracksChanged {
1003 participant_id: participant.peer_id,
1004 });
1005 }
1006
1007 RoomUpdate::ActiveSpeakersChanged { speakers } => {
1008 let mut speaker_ids = speakers
1009 .into_iter()
1010 .filter_map(|speaker_sid| speaker_sid.parse().ok())
1011 .collect::<Vec<u64>>();
1012 speaker_ids.sort_unstable();
1013 for (sid, participant) in &mut self.remote_participants {
1014 participant.speaking = speaker_ids.binary_search(sid).is_ok();
1015 }
1016 if let Some(id) = self.client.user_id() {
1017 if let Some(room) = &mut self.live_kit {
1018 room.speaking = speaker_ids.binary_search(&id).is_ok();
1019 }
1020 }
1021 }
1022
1023 RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
1024 let mut found = false;
1025 for participant in &mut self.remote_participants.values_mut() {
1026 for track in participant.audio_tracks.values() {
1027 if track.sid() == track_id {
1028 found = true;
1029 break;
1030 }
1031 }
1032 if found {
1033 participant.muted = muted;
1034 break;
1035 }
1036 }
1037 }
1038
1039 RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
1040 if let Some(live_kit) = &self.live_kit {
1041 if live_kit.deafened {
1042 track.stop();
1043 cx.foreground_executor()
1044 .spawn(publication.set_enabled(false))
1045 .detach();
1046 }
1047 }
1048
1049 let user_id = track.publisher_id().parse()?;
1050 let track_id = track.sid().to_string();
1051 let participant = self
1052 .remote_participants
1053 .get_mut(&user_id)
1054 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1055 participant.audio_tracks.insert(track_id.clone(), track);
1056 participant.muted = publication.is_muted();
1057
1058 cx.emit(Event::RemoteAudioTracksChanged {
1059 participant_id: participant.peer_id,
1060 });
1061 }
1062
1063 RoomUpdate::UnsubscribedFromRemoteAudioTrack {
1064 publisher_id,
1065 track_id,
1066 } => {
1067 let user_id = publisher_id.parse()?;
1068 let participant = self
1069 .remote_participants
1070 .get_mut(&user_id)
1071 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1072 participant.audio_tracks.remove(&track_id);
1073 cx.emit(Event::RemoteAudioTracksChanged {
1074 participant_id: participant.peer_id,
1075 });
1076 }
1077
1078 RoomUpdate::LocalAudioTrackUnpublished { publication } => {
1079 log::info!("unpublished audio track {}", publication.sid());
1080 if let Some(room) = &mut self.live_kit {
1081 room.microphone_track = LocalTrack::None;
1082 }
1083 }
1084
1085 RoomUpdate::LocalVideoTrackUnpublished { publication } => {
1086 log::info!("unpublished video track {}", publication.sid());
1087 if let Some(room) = &mut self.live_kit {
1088 room.screen_track = LocalTrack::None;
1089 }
1090 }
1091
1092 RoomUpdate::LocalAudioTrackPublished { publication } => {
1093 log::info!("published audio track {}", publication.sid());
1094 }
1095
1096 RoomUpdate::LocalVideoTrackPublished { publication } => {
1097 log::info!("published video track {}", publication.sid());
1098 }
1099 }
1100
1101 cx.notify();
1102 Ok(())
1103 }
1104
1105 fn check_invariants(&self) {
1106 #[cfg(any(test, feature = "test-support"))]
1107 {
1108 for participant in self.remote_participants.values() {
1109 assert!(self.participant_user_ids.contains(&participant.user.id));
1110 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1111 }
1112
1113 for participant in &self.pending_participants {
1114 assert!(self.participant_user_ids.contains(&participant.id));
1115 assert_ne!(participant.id, self.client.user_id().unwrap());
1116 }
1117
1118 assert_eq!(
1119 self.participant_user_ids.len(),
1120 self.remote_participants.len() + self.pending_participants.len()
1121 );
1122 }
1123 }
1124
1125 pub(crate) fn call(
1126 &mut self,
1127 called_user_id: u64,
1128 initial_project_id: Option<u64>,
1129 cx: &mut Context<Self>,
1130 ) -> Task<Result<()>> {
1131 if self.status.is_offline() {
1132 return Task::ready(Err(anyhow!("room is offline")));
1133 }
1134
1135 cx.notify();
1136 let client = self.client.clone();
1137 let room_id = self.id;
1138 self.pending_call_count += 1;
1139 cx.spawn(move |this, mut cx| async move {
1140 let result = client
1141 .request(proto::Call {
1142 room_id,
1143 called_user_id,
1144 initial_project_id,
1145 })
1146 .await;
1147 this.update(&mut cx, |this, cx| {
1148 this.pending_call_count -= 1;
1149 if this.should_leave() {
1150 this.leave(cx).detach_and_log_err(cx);
1151 }
1152 })?;
1153 result?;
1154 Ok(())
1155 })
1156 }
1157
1158 pub fn join_project(
1159 &mut self,
1160 id: u64,
1161 language_registry: Arc<LanguageRegistry>,
1162 fs: Arc<dyn Fs>,
1163 cx: &mut Context<Self>,
1164 ) -> Task<Result<Entity<Project>>> {
1165 let client = self.client.clone();
1166 let user_store = self.user_store.clone();
1167 cx.emit(Event::RemoteProjectJoined { project_id: id });
1168 cx.spawn(move |this, mut cx| async move {
1169 let project =
1170 Project::in_room(id, client, user_store, language_registry, fs, cx.clone()).await?;
1171
1172 this.update(&mut cx, |this, cx| {
1173 this.joined_projects.retain(|project| {
1174 if let Some(project) = project.upgrade() {
1175 !project.read(cx).is_disconnected(cx)
1176 } else {
1177 false
1178 }
1179 });
1180 this.joined_projects.insert(project.downgrade());
1181 })?;
1182 Ok(project)
1183 })
1184 }
1185
1186 pub fn share_project(
1187 &mut self,
1188 project: Entity<Project>,
1189 cx: &mut Context<Self>,
1190 ) -> Task<Result<u64>> {
1191 if let Some(project_id) = project.read(cx).remote_id() {
1192 return Task::ready(Ok(project_id));
1193 }
1194
1195 let request = self.client.request(proto::ShareProject {
1196 room_id: self.id(),
1197 worktrees: project.read(cx).worktree_metadata_protos(cx),
1198 is_ssh_project: project.read(cx).is_via_ssh(),
1199 });
1200
1201 cx.spawn(|this, mut cx| async move {
1202 let response = request.await?;
1203
1204 project.update(&mut cx, |project, cx| {
1205 project.shared(response.project_id, cx)
1206 })??;
1207
1208 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1209 this.update(&mut cx, |this, cx| {
1210 this.shared_projects.insert(project.downgrade());
1211 let active_project = this.local_participant.active_project.as_ref();
1212 if active_project.map_or(false, |location| *location == project) {
1213 this.set_location(Some(&project), cx)
1214 } else {
1215 Task::ready(Ok(()))
1216 }
1217 })?
1218 .await?;
1219
1220 Ok(response.project_id)
1221 })
1222 }
1223
1224 pub(crate) fn unshare_project(
1225 &mut self,
1226 project: Entity<Project>,
1227 cx: &mut Context<Self>,
1228 ) -> Result<()> {
1229 let project_id = match project.read(cx).remote_id() {
1230 Some(project_id) => project_id,
1231 None => return Ok(()),
1232 };
1233
1234 self.client.send(proto::UnshareProject { project_id })?;
1235 project.update(cx, |this, cx| this.unshare(cx))?;
1236
1237 if self.local_participant.active_project == Some(project.downgrade()) {
1238 self.set_location(Some(&project), cx).detach_and_log_err(cx);
1239 }
1240 Ok(())
1241 }
1242
1243 pub(crate) fn set_location(
1244 &mut self,
1245 project: Option<&Entity<Project>>,
1246 cx: &mut Context<Self>,
1247 ) -> Task<Result<()>> {
1248 if self.status.is_offline() {
1249 return Task::ready(Err(anyhow!("room is offline")));
1250 }
1251
1252 let client = self.client.clone();
1253 let room_id = self.id;
1254 let location = if let Some(project) = project {
1255 self.local_participant.active_project = Some(project.downgrade());
1256 if let Some(project_id) = project.read(cx).remote_id() {
1257 proto::participant_location::Variant::SharedProject(
1258 proto::participant_location::SharedProject { id: project_id },
1259 )
1260 } else {
1261 proto::participant_location::Variant::UnsharedProject(
1262 proto::participant_location::UnsharedProject {},
1263 )
1264 }
1265 } else {
1266 self.local_participant.active_project = None;
1267 proto::participant_location::Variant::External(proto::participant_location::External {})
1268 };
1269
1270 cx.notify();
1271 cx.background_spawn(async move {
1272 client
1273 .request(proto::UpdateParticipantLocation {
1274 room_id,
1275 location: Some(proto::ParticipantLocation {
1276 variant: Some(location),
1277 }),
1278 })
1279 .await?;
1280 Ok(())
1281 })
1282 }
1283
1284 pub fn is_screen_sharing(&self) -> bool {
1285 self.live_kit.as_ref().map_or(false, |live_kit| {
1286 !matches!(live_kit.screen_track, LocalTrack::None)
1287 })
1288 }
1289
1290 pub fn is_sharing_mic(&self) -> bool {
1291 self.live_kit.as_ref().map_or(false, |live_kit| {
1292 !matches!(live_kit.microphone_track, LocalTrack::None)
1293 })
1294 }
1295
1296 pub fn is_muted(&self) -> bool {
1297 self.live_kit.as_ref().map_or(false, |live_kit| {
1298 matches!(live_kit.microphone_track, LocalTrack::None)
1299 || live_kit.muted_by_user
1300 || live_kit.deafened
1301 })
1302 }
1303
1304 pub fn muted_by_user(&self) -> bool {
1305 self.live_kit
1306 .as_ref()
1307 .map_or(false, |live_kit| live_kit.muted_by_user)
1308 }
1309
1310 pub fn is_speaking(&self) -> bool {
1311 self.live_kit
1312 .as_ref()
1313 .map_or(false, |live_kit| live_kit.speaking)
1314 }
1315
1316 pub fn is_deafened(&self) -> Option<bool> {
1317 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1318 }
1319
1320 pub fn can_use_microphone(&self) -> bool {
1321 use proto::ChannelRole::*;
1322 match self.local_participant.role {
1323 Admin | Member | Talker => true,
1324 Guest | Banned => false,
1325 }
1326 }
1327
1328 pub fn can_share_projects(&self) -> bool {
1329 use proto::ChannelRole::*;
1330 match self.local_participant.role {
1331 Admin | Member => true,
1332 Guest | Banned | Talker => false,
1333 }
1334 }
1335
1336 #[track_caller]
1337 pub fn share_microphone(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1338 if self.status.is_offline() {
1339 return Task::ready(Err(anyhow!("room is offline")));
1340 }
1341
1342 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1343 let publish_id = post_inc(&mut live_kit.next_publish_id);
1344 live_kit.microphone_track = LocalTrack::Pending { publish_id };
1345 cx.notify();
1346 publish_id
1347 } else {
1348 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1349 };
1350
1351 cx.spawn(move |this, mut cx| async move {
1352 let publish_track = async {
1353 let track = LocalAudioTrack::create();
1354 this.upgrade()
1355 .ok_or_else(|| anyhow!("room was dropped"))?
1356 .update(&mut cx, |this, _| {
1357 this.live_kit
1358 .as_ref()
1359 .map(|live_kit| live_kit.room.publish_audio_track(track))
1360 })?
1361 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1362 .await
1363 };
1364 let publication = publish_track.await;
1365 this.upgrade()
1366 .ok_or_else(|| anyhow!("room was dropped"))?
1367 .update(&mut cx, |this, cx| {
1368 let live_kit = this
1369 .live_kit
1370 .as_mut()
1371 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1372
1373 let canceled = if let LocalTrack::Pending {
1374 publish_id: cur_publish_id,
1375 } = &live_kit.microphone_track
1376 {
1377 *cur_publish_id != publish_id
1378 } else {
1379 true
1380 };
1381
1382 match publication {
1383 Ok(publication) => {
1384 if canceled {
1385 live_kit.room.unpublish_track(publication);
1386 } else {
1387 if live_kit.muted_by_user || live_kit.deafened {
1388 cx.background_spawn(publication.set_mute(true)).detach();
1389 }
1390 live_kit.microphone_track = LocalTrack::Published {
1391 track_publication: publication,
1392 };
1393 cx.notify();
1394 }
1395 Ok(())
1396 }
1397 Err(error) => {
1398 if canceled {
1399 Ok(())
1400 } else {
1401 live_kit.microphone_track = LocalTrack::None;
1402 cx.notify();
1403 Err(error)
1404 }
1405 }
1406 }
1407 })?
1408 })
1409 }
1410
1411 pub fn share_screen(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1412 if self.status.is_offline() {
1413 return Task::ready(Err(anyhow!("room is offline")));
1414 } else if self.is_screen_sharing() {
1415 return Task::ready(Err(anyhow!("screen was already shared")));
1416 }
1417
1418 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1419 let publish_id = post_inc(&mut live_kit.next_publish_id);
1420 live_kit.screen_track = LocalTrack::Pending { publish_id };
1421 cx.notify();
1422 (live_kit.room.display_sources(), publish_id)
1423 } else {
1424 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1425 };
1426
1427 cx.spawn(move |this, mut cx| async move {
1428 let publish_track = async {
1429 let displays = displays.await?;
1430 let display = displays
1431 .first()
1432 .ok_or_else(|| anyhow!("no display found"))?;
1433 let track = LocalVideoTrack::screen_share_for_display(display);
1434 this.upgrade()
1435 .ok_or_else(|| anyhow!("room was dropped"))?
1436 .update(&mut cx, |this, _| {
1437 this.live_kit
1438 .as_ref()
1439 .map(|live_kit| live_kit.room.publish_video_track(track))
1440 })?
1441 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1442 .await
1443 };
1444
1445 let publication = publish_track.await;
1446 this.upgrade()
1447 .ok_or_else(|| anyhow!("room was dropped"))?
1448 .update(&mut cx, |this, cx| {
1449 let live_kit = this
1450 .live_kit
1451 .as_mut()
1452 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1453
1454 let canceled = if let LocalTrack::Pending {
1455 publish_id: cur_publish_id,
1456 } = &live_kit.screen_track
1457 {
1458 *cur_publish_id != publish_id
1459 } else {
1460 true
1461 };
1462
1463 match publication {
1464 Ok(publication) => {
1465 if canceled {
1466 live_kit.room.unpublish_track(publication);
1467 } else {
1468 live_kit.screen_track = LocalTrack::Published {
1469 track_publication: publication,
1470 };
1471 cx.notify();
1472 }
1473
1474 Audio::play_sound(Sound::StartScreenshare, cx);
1475
1476 Ok(())
1477 }
1478 Err(error) => {
1479 if canceled {
1480 Ok(())
1481 } else {
1482 live_kit.screen_track = LocalTrack::None;
1483 cx.notify();
1484 Err(error)
1485 }
1486 }
1487 }
1488 })?
1489 })
1490 }
1491
1492 pub fn toggle_mute(&mut self, cx: &mut Context<Self>) {
1493 if let Some(live_kit) = self.live_kit.as_mut() {
1494 // When unmuting, undeafen if the user was deafened before.
1495 let was_deafened = live_kit.deafened;
1496 if live_kit.muted_by_user
1497 || live_kit.deafened
1498 || matches!(live_kit.microphone_track, LocalTrack::None)
1499 {
1500 live_kit.muted_by_user = false;
1501 live_kit.deafened = false;
1502 } else {
1503 live_kit.muted_by_user = true;
1504 }
1505 let muted = live_kit.muted_by_user;
1506 let should_undeafen = was_deafened && !live_kit.deafened;
1507
1508 if let Some(task) = self.set_mute(muted, cx) {
1509 task.detach_and_log_err(cx);
1510 }
1511
1512 if should_undeafen {
1513 if let Some(task) = self.set_deafened(false, cx) {
1514 task.detach_and_log_err(cx);
1515 }
1516 }
1517 }
1518 }
1519
1520 pub fn toggle_deafen(&mut self, cx: &mut Context<Self>) {
1521 if let Some(live_kit) = self.live_kit.as_mut() {
1522 // When deafening, mute the microphone if it was not already muted.
1523 // When un-deafening, unmute the microphone, unless it was explicitly muted.
1524 let deafened = !live_kit.deafened;
1525 live_kit.deafened = deafened;
1526 let should_change_mute = !live_kit.muted_by_user;
1527
1528 if let Some(task) = self.set_deafened(deafened, cx) {
1529 task.detach_and_log_err(cx);
1530 }
1531
1532 if should_change_mute {
1533 if let Some(task) = self.set_mute(deafened, cx) {
1534 task.detach_and_log_err(cx);
1535 }
1536 }
1537 }
1538 }
1539
1540 pub fn unshare_screen(&mut self, cx: &mut Context<Self>) -> Result<()> {
1541 if self.status.is_offline() {
1542 return Err(anyhow!("room is offline"));
1543 }
1544
1545 let live_kit = self
1546 .live_kit
1547 .as_mut()
1548 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1549 match mem::take(&mut live_kit.screen_track) {
1550 LocalTrack::None => Err(anyhow!("screen was not shared")),
1551 LocalTrack::Pending { .. } => {
1552 cx.notify();
1553 Ok(())
1554 }
1555 LocalTrack::Published {
1556 track_publication, ..
1557 } => {
1558 live_kit.room.unpublish_track(track_publication);
1559 cx.notify();
1560
1561 Audio::play_sound(Sound::StopScreenshare, cx);
1562 Ok(())
1563 }
1564 }
1565 }
1566
1567 fn set_deafened(&mut self, deafened: bool, cx: &mut Context<Self>) -> Option<Task<Result<()>>> {
1568 let live_kit = self.live_kit.as_mut()?;
1569 cx.notify();
1570
1571 let mut track_updates = Vec::new();
1572 for participant in self.remote_participants.values() {
1573 for publication in live_kit
1574 .room
1575 .remote_audio_track_publications(&participant.user.id.to_string())
1576 {
1577 track_updates.push(publication.set_enabled(!deafened));
1578 }
1579
1580 for track in participant.audio_tracks.values() {
1581 if deafened {
1582 track.stop();
1583 } else {
1584 track.start();
1585 }
1586 }
1587 }
1588
1589 Some(cx.foreground_executor().spawn(async move {
1590 for result in futures::future::join_all(track_updates).await {
1591 result?;
1592 }
1593 Ok(())
1594 }))
1595 }
1596
1597 fn set_mute(&mut self, should_mute: bool, cx: &mut Context<Room>) -> Option<Task<Result<()>>> {
1598 let live_kit = self.live_kit.as_mut()?;
1599 cx.notify();
1600
1601 if should_mute {
1602 Audio::play_sound(Sound::Mute, cx);
1603 } else {
1604 Audio::play_sound(Sound::Unmute, cx);
1605 }
1606
1607 match &mut live_kit.microphone_track {
1608 LocalTrack::None => {
1609 if should_mute {
1610 None
1611 } else {
1612 Some(self.share_microphone(cx))
1613 }
1614 }
1615 LocalTrack::Pending { .. } => None,
1616 LocalTrack::Published { track_publication } => Some(
1617 cx.foreground_executor()
1618 .spawn(track_publication.set_mute(should_mute)),
1619 ),
1620 }
1621 }
1622
1623 #[cfg(any(test, feature = "test-support"))]
1624 pub fn set_display_sources(&self, sources: Vec<livekit_client_macos::MacOSDisplay>) {
1625 self.live_kit
1626 .as_ref()
1627 .unwrap()
1628 .room
1629 .set_display_sources(sources);
1630 }
1631}
1632
1633struct LiveKitRoom {
1634 room: Arc<livekit_client_macos::Room>,
1635 screen_track: LocalTrack,
1636 microphone_track: LocalTrack,
1637 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1638 muted_by_user: bool,
1639 deafened: bool,
1640 speaking: bool,
1641 next_publish_id: usize,
1642 _maintain_room: Task<()>,
1643 _handle_updates: Task<()>,
1644}
1645
1646impl LiveKitRoom {
1647 fn stop_publishing(&mut self, cx: &mut Context<Room>) {
1648 if let LocalTrack::Published {
1649 track_publication, ..
1650 } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1651 {
1652 self.room.unpublish_track(track_publication);
1653 cx.notify();
1654 }
1655
1656 if let LocalTrack::Published {
1657 track_publication, ..
1658 } = mem::replace(&mut self.screen_track, LocalTrack::None)
1659 {
1660 self.room.unpublish_track(track_publication);
1661 cx.notify();
1662 }
1663 }
1664}
1665
1666enum LocalTrack {
1667 None,
1668 Pending {
1669 publish_id: usize,
1670 },
1671 Published {
1672 track_publication: LocalTrackPublication,
1673 },
1674}
1675
1676impl Default for LocalTrack {
1677 fn default() -> Self {
1678 Self::None
1679 }
1680}
1681
1682#[derive(Copy, Clone, PartialEq, Eq)]
1683pub enum RoomStatus {
1684 Online,
1685 Rejoining,
1686 Offline,
1687}
1688
1689impl RoomStatus {
1690 pub fn is_offline(&self) -> bool {
1691 matches!(self, RoomStatus::Offline)
1692 }
1693
1694 pub fn is_online(&self) -> bool {
1695 matches!(self, RoomStatus::Online)
1696 }
1697}