1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
4};
5use anyhow::{anyhow, Result};
6use audio::{Audio, Sound};
7use client::{
8 proto::{self, PeerId},
9 ChannelId, Client, ParticipantIndex, TypedEnvelope, User, UserStore,
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use fs::Fs;
13use futures::{FutureExt, StreamExt};
14use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
15use language::LanguageRegistry;
16use livekit_client_macos::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
17use postage::{sink::Sink, stream::Stream, watch};
18use project::Project;
19use settings::Settings as _;
20use std::{future::Future, mem, sync::Arc, time::Duration};
21use util::{post_inc, ResultExt, TryFutureExt};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub enum Event {
27 RoomJoined {
28 channel_id: Option<ChannelId>,
29 },
30 ParticipantLocationChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteVideoTracksChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteAudioTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteProjectShared {
40 owner: Arc<User>,
41 project_id: u64,
42 worktree_root_names: Vec<String>,
43 },
44 RemoteProjectUnshared {
45 project_id: u64,
46 },
47 RemoteProjectJoined {
48 project_id: u64,
49 },
50 RemoteProjectInvitationDiscarded {
51 project_id: u64,
52 },
53 RoomLeft {
54 channel_id: Option<ChannelId>,
55 },
56}
57
58pub struct Room {
59 id: u64,
60 channel_id: Option<ChannelId>,
61 live_kit: Option<LiveKitRoom>,
62 status: RoomStatus,
63 shared_projects: HashSet<WeakEntity<Project>>,
64 joined_projects: HashSet<WeakEntity<Project>>,
65 local_participant: LocalParticipant,
66 remote_participants: BTreeMap<u64, RemoteParticipant>,
67 pending_participants: Vec<Arc<User>>,
68 participant_user_ids: HashSet<u64>,
69 pending_call_count: usize,
70 leave_when_empty: bool,
71 client: Arc<Client>,
72 user_store: Entity<UserStore>,
73 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
74 client_subscriptions: Vec<client::Subscription>,
75 _subscriptions: Vec<gpui::Subscription>,
76 room_update_completed_tx: watch::Sender<Option<()>>,
77 room_update_completed_rx: watch::Receiver<Option<()>>,
78 pending_room_update: Option<Task<()>>,
79 maintain_connection: Option<Task<Option<()>>>,
80}
81
82impl EventEmitter<Event> for Room {}
83
84impl Room {
85 pub fn channel_id(&self) -> Option<ChannelId> {
86 self.channel_id
87 }
88
89 pub fn is_sharing_project(&self) -> bool {
90 !self.shared_projects.is_empty()
91 }
92
93 #[cfg(any(test, feature = "test-support"))]
94 pub fn is_connected(&self) -> bool {
95 if let Some(live_kit) = self.live_kit.as_ref() {
96 matches!(
97 *live_kit.room.status().borrow(),
98 livekit_client_macos::ConnectionState::Connected { .. }
99 )
100 } else {
101 false
102 }
103 }
104
105 fn new(
106 id: u64,
107 channel_id: Option<ChannelId>,
108 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
109 client: Arc<Client>,
110 user_store: Entity<UserStore>,
111 cx: &mut Context<Self>,
112 ) -> Self {
113 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
114 let room = livekit_client_macos::Room::new();
115 let mut status = room.status();
116 // Consume the initial status of the room.
117 let _ = status.try_recv();
118 let _maintain_room = cx.spawn(async move |this, cx| {
119 while let Some(status) = status.next().await {
120 let this = if let Some(this) = this.upgrade() {
121 this
122 } else {
123 break;
124 };
125
126 if status == livekit_client_macos::ConnectionState::Disconnected {
127 this.update(cx, |this, cx| this.leave(cx).log_err()).ok();
128 break;
129 }
130 }
131 });
132
133 let _handle_updates = cx.spawn({
134 let room = room.clone();
135 async move |this, cx| {
136 let mut updates = room.updates();
137 while let Some(update) = updates.next().await {
138 let this = if let Some(this) = this.upgrade() {
139 this
140 } else {
141 break;
142 };
143
144 this.update(cx, |this, cx| {
145 this.live_kit_room_updated(update, cx).log_err()
146 })
147 .ok();
148 }
149 }
150 });
151
152 let connect = room.connect(&connection_info.server_url, &connection_info.token);
153 cx.spawn(async move |this, cx| {
154 connect.await?;
155 this.update(cx, |this, cx| {
156 if this.can_use_microphone() {
157 if let Some(live_kit) = &this.live_kit {
158 if !live_kit.muted_by_user && !live_kit.deafened {
159 return this.share_microphone(cx);
160 }
161 }
162 }
163 Task::ready(Ok(()))
164 })?
165 .await
166 })
167 .detach_and_log_err(cx);
168
169 Some(LiveKitRoom {
170 room,
171 screen_track: LocalTrack::None,
172 microphone_track: LocalTrack::None,
173 next_publish_id: 0,
174 muted_by_user: Self::mute_on_join(cx),
175 deafened: false,
176 speaking: false,
177 _maintain_room,
178 _handle_updates,
179 })
180 } else {
181 None
182 };
183
184 let maintain_connection = cx.spawn({
185 let client = client.clone();
186 async move |this, cx| {
187 Self::maintain_connection(this, client.clone(), cx)
188 .log_err()
189 .await
190 }
191 });
192
193 Audio::play_sound(Sound::Joined, cx);
194
195 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
196
197 Self {
198 id,
199 channel_id,
200 live_kit: live_kit_room,
201 status: RoomStatus::Online,
202 shared_projects: Default::default(),
203 joined_projects: Default::default(),
204 participant_user_ids: Default::default(),
205 local_participant: Default::default(),
206 remote_participants: Default::default(),
207 pending_participants: Default::default(),
208 pending_call_count: 0,
209 client_subscriptions: vec![
210 client.add_message_handler(cx.weak_entity(), Self::handle_room_updated)
211 ],
212 _subscriptions: vec![
213 cx.on_release(Self::released),
214 cx.on_app_quit(Self::app_will_quit),
215 ],
216 leave_when_empty: false,
217 pending_room_update: None,
218 client,
219 user_store,
220 follows_by_leader_id_project_id: Default::default(),
221 maintain_connection: Some(maintain_connection),
222 room_update_completed_tx,
223 room_update_completed_rx,
224 }
225 }
226
227 pub(crate) fn create(
228 called_user_id: u64,
229 initial_project: Option<Entity<Project>>,
230 client: Arc<Client>,
231 user_store: Entity<UserStore>,
232 cx: &mut App,
233 ) -> Task<Result<Entity<Self>>> {
234 cx.spawn(async move |cx| {
235 let response = client.request(proto::CreateRoom {}).await?;
236 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
237 let room = cx.new(|cx| {
238 let mut room = Self::new(
239 room_proto.id,
240 None,
241 response.live_kit_connection_info,
242 client,
243 user_store,
244 cx,
245 );
246 if let Some(participant) = room_proto.participants.first() {
247 room.local_participant.role = participant.role()
248 }
249 room
250 })?;
251
252 let initial_project_id = if let Some(initial_project) = initial_project {
253 let initial_project_id = room
254 .update(cx, |room, cx| {
255 room.share_project(initial_project.clone(), cx)
256 })?
257 .await?;
258 Some(initial_project_id)
259 } else {
260 None
261 };
262
263 let did_join = room
264 .update(cx, |room, cx| {
265 room.leave_when_empty = true;
266 room.call(called_user_id, initial_project_id, cx)
267 })?
268 .await;
269 match did_join {
270 Ok(()) => Ok(room),
271 Err(error) => Err(error.context("room creation failed")),
272 }
273 })
274 }
275
276 pub(crate) async fn join_channel(
277 channel_id: ChannelId,
278 client: Arc<Client>,
279 user_store: Entity<UserStore>,
280 cx: &mut AsyncApp,
281 ) -> Result<Entity<Self>> {
282 Self::from_join_response(
283 client
284 .request(proto::JoinChannel {
285 channel_id: channel_id.0,
286 })
287 .await?,
288 client,
289 user_store,
290 cx,
291 )
292 }
293
294 pub(crate) async fn join(
295 room_id: u64,
296 client: Arc<Client>,
297 user_store: Entity<UserStore>,
298 cx: &mut AsyncApp,
299 ) -> Result<Entity<Self>> {
300 Self::from_join_response(
301 client.request(proto::JoinRoom { id: room_id }).await?,
302 client,
303 user_store,
304 cx,
305 )
306 }
307
308 fn released(&mut self, cx: &mut App) {
309 if self.status.is_online() {
310 self.leave_internal(cx).detach_and_log_err(cx);
311 }
312 }
313
314 fn app_will_quit(&mut self, cx: &mut Context<Self>) -> impl Future<Output = ()> {
315 let task = if self.status.is_online() {
316 let leave = self.leave_internal(cx);
317 Some(cx.background_spawn(async move {
318 leave.await.log_err();
319 }))
320 } else {
321 None
322 };
323
324 async move {
325 if let Some(task) = task {
326 task.await;
327 }
328 }
329 }
330
331 pub fn mute_on_join(cx: &App) -> bool {
332 CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
333 }
334
335 fn from_join_response(
336 response: proto::JoinRoomResponse,
337 client: Arc<Client>,
338 user_store: Entity<UserStore>,
339 cx: &mut AsyncApp,
340 ) -> Result<Entity<Self>> {
341 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
342 let room = cx.new(|cx| {
343 Self::new(
344 room_proto.id,
345 response.channel_id.map(ChannelId),
346 response.live_kit_connection_info,
347 client,
348 user_store,
349 cx,
350 )
351 })?;
352 room.update(cx, |room, cx| {
353 room.leave_when_empty = room.channel_id.is_none();
354 room.apply_room_update(room_proto, cx)?;
355 anyhow::Ok(())
356 })??;
357 Ok(room)
358 }
359
360 fn should_leave(&self) -> bool {
361 self.leave_when_empty
362 && self.pending_room_update.is_none()
363 && self.pending_participants.is_empty()
364 && self.remote_participants.is_empty()
365 && self.pending_call_count == 0
366 }
367
368 pub(crate) fn leave(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
369 cx.notify();
370 self.leave_internal(cx)
371 }
372
373 fn leave_internal(&mut self, cx: &mut App) -> Task<Result<()>> {
374 if self.status.is_offline() {
375 return Task::ready(Err(anyhow!("room is offline")));
376 }
377
378 log::info!("leaving room");
379 Audio::play_sound(Sound::Leave, cx);
380
381 self.clear_state(cx);
382
383 let leave_room = self.client.request(proto::LeaveRoom {});
384 cx.background_spawn(async move {
385 leave_room.await?;
386 anyhow::Ok(())
387 })
388 }
389
390 pub(crate) fn clear_state(&mut self, cx: &mut App) {
391 for project in self.shared_projects.drain() {
392 if let Some(project) = project.upgrade() {
393 project.update(cx, |project, cx| {
394 project.unshare(cx).log_err();
395 });
396 }
397 }
398 for project in self.joined_projects.drain() {
399 if let Some(project) = project.upgrade() {
400 project.update(cx, |project, cx| {
401 project.disconnected_from_host(cx);
402 project.close(cx);
403 });
404 }
405 }
406
407 self.status = RoomStatus::Offline;
408 self.remote_participants.clear();
409 self.pending_participants.clear();
410 self.participant_user_ids.clear();
411 self.client_subscriptions.clear();
412 self.live_kit.take();
413 self.pending_room_update.take();
414 self.maintain_connection.take();
415 }
416
417 async fn maintain_connection(
418 this: WeakEntity<Self>,
419 client: Arc<Client>,
420 cx: &mut AsyncApp,
421 ) -> Result<()> {
422 let mut client_status = client.status();
423 loop {
424 let _ = client_status.try_recv();
425 let is_connected = client_status.borrow().is_connected();
426 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
427 if !is_connected || client_status.next().await.is_some() {
428 log::info!("detected client disconnection");
429
430 this.upgrade()
431 .ok_or_else(|| anyhow!("room was dropped"))?
432 .update(cx, |this, cx| {
433 this.status = RoomStatus::Rejoining;
434 cx.notify();
435 })?;
436
437 // Wait for client to re-establish a connection to the server.
438 {
439 let mut reconnection_timeout =
440 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
441 let client_reconnection = async {
442 let mut remaining_attempts = 3;
443 while remaining_attempts > 0 {
444 if client_status.borrow().is_connected() {
445 log::info!("client reconnected, attempting to rejoin room");
446
447 let Some(this) = this.upgrade() else { break };
448 match this.update(cx, |this, cx| this.rejoin(cx)) {
449 Ok(task) => {
450 if task.await.log_err().is_some() {
451 return true;
452 } else {
453 remaining_attempts -= 1;
454 }
455 }
456 Err(_app_dropped) => return false,
457 }
458 } else if client_status.borrow().is_signed_out() {
459 return false;
460 }
461
462 log::info!(
463 "waiting for client status change, remaining attempts {}",
464 remaining_attempts
465 );
466 client_status.next().await;
467 }
468 false
469 }
470 .fuse();
471 futures::pin_mut!(client_reconnection);
472
473 futures::select_biased! {
474 reconnected = client_reconnection => {
475 if reconnected {
476 log::info!("successfully reconnected to room");
477 // If we successfully joined the room, go back around the loop
478 // waiting for future connection status changes.
479 continue;
480 }
481 }
482 _ = reconnection_timeout => {
483 log::info!("room reconnection timeout expired");
484 }
485 }
486 }
487
488 break;
489 }
490 }
491
492 // The client failed to re-establish a connection to the server
493 // or an error occurred while trying to re-join the room. Either way
494 // we leave the room and return an error.
495 if let Some(this) = this.upgrade() {
496 log::info!("reconnection failed, leaving room");
497 this.update(cx, |this, cx| this.leave(cx))?.await?;
498 }
499 Err(anyhow!(
500 "can't reconnect to room: client failed to re-establish connection"
501 ))
502 }
503
504 fn rejoin(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
505 let mut projects = HashMap::default();
506 let mut reshared_projects = Vec::new();
507 let mut rejoined_projects = Vec::new();
508 self.shared_projects.retain(|project| {
509 if let Some(handle) = project.upgrade() {
510 let project = handle.read(cx);
511 if let Some(project_id) = project.remote_id() {
512 projects.insert(project_id, handle.clone());
513 reshared_projects.push(proto::UpdateProject {
514 project_id,
515 worktrees: project.worktree_metadata_protos(cx),
516 });
517 return true;
518 }
519 }
520 false
521 });
522 self.joined_projects.retain(|project| {
523 if let Some(handle) = project.upgrade() {
524 let project = handle.read(cx);
525 if let Some(project_id) = project.remote_id() {
526 projects.insert(project_id, handle.clone());
527 rejoined_projects.push(proto::RejoinProject {
528 id: project_id,
529 worktrees: project
530 .worktrees(cx)
531 .map(|worktree| {
532 let worktree = worktree.read(cx);
533 proto::RejoinWorktree {
534 id: worktree.id().to_proto(),
535 scan_id: worktree.completed_scan_id() as u64,
536 }
537 })
538 .collect(),
539 });
540 }
541 return true;
542 }
543 false
544 });
545
546 let response = self.client.request_envelope(proto::RejoinRoom {
547 id: self.id,
548 reshared_projects,
549 rejoined_projects,
550 });
551
552 cx.spawn(async move |this, cx| {
553 let response = response.await?;
554 let message_id = response.message_id;
555 let response = response.payload;
556 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
557 this.update(cx, |this, cx| {
558 this.status = RoomStatus::Online;
559 this.apply_room_update(room_proto, cx)?;
560
561 for reshared_project in response.reshared_projects {
562 if let Some(project) = projects.get(&reshared_project.id) {
563 project.update(cx, |project, cx| {
564 project.reshared(reshared_project, cx).log_err();
565 });
566 }
567 }
568
569 for rejoined_project in response.rejoined_projects {
570 if let Some(project) = projects.get(&rejoined_project.id) {
571 project.update(cx, |project, cx| {
572 project.rejoined(rejoined_project, message_id, cx).log_err();
573 });
574 }
575 }
576
577 anyhow::Ok(())
578 })?
579 })
580 }
581
582 pub fn id(&self) -> u64 {
583 self.id
584 }
585
586 pub fn status(&self) -> RoomStatus {
587 self.status
588 }
589
590 pub fn local_participant(&self) -> &LocalParticipant {
591 &self.local_participant
592 }
593
594 pub fn local_participant_user(&self, cx: &App) -> Option<Arc<User>> {
595 self.user_store.read(cx).current_user()
596 }
597
598 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
599 &self.remote_participants
600 }
601
602 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
603 self.remote_participants
604 .values()
605 .find(|p| p.peer_id == peer_id)
606 }
607
608 pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
609 self.remote_participants
610 .get(&user_id)
611 .map(|participant| participant.role)
612 }
613
614 pub fn contains_guests(&self) -> bool {
615 self.local_participant.role == proto::ChannelRole::Guest
616 || self
617 .remote_participants
618 .values()
619 .any(|p| p.role == proto::ChannelRole::Guest)
620 }
621
622 pub fn local_participant_is_admin(&self) -> bool {
623 self.local_participant.role == proto::ChannelRole::Admin
624 }
625
626 pub fn local_participant_is_guest(&self) -> bool {
627 self.local_participant.role == proto::ChannelRole::Guest
628 }
629
630 pub fn set_participant_role(
631 &mut self,
632 user_id: u64,
633 role: proto::ChannelRole,
634 cx: &Context<Self>,
635 ) -> Task<Result<()>> {
636 let client = self.client.clone();
637 let room_id = self.id;
638 let role = role.into();
639 cx.spawn(async move |_, _| {
640 client
641 .request(proto::SetRoomParticipantRole {
642 room_id,
643 user_id,
644 role,
645 })
646 .await
647 .map(|_| ())
648 })
649 }
650
651 pub fn pending_participants(&self) -> &[Arc<User>] {
652 &self.pending_participants
653 }
654
655 pub fn contains_participant(&self, user_id: u64) -> bool {
656 self.participant_user_ids.contains(&user_id)
657 }
658
659 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
660 self.follows_by_leader_id_project_id
661 .get(&(leader_id, project_id))
662 .map_or(&[], |v| v.as_slice())
663 }
664
665 /// Returns the most 'active' projects, defined as most people in the project
666 pub fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
667 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
668 for participant in self.remote_participants.values() {
669 match participant.location {
670 ParticipantLocation::SharedProject { project_id } => {
671 project_hosts_and_guest_counts
672 .entry(project_id)
673 .or_default()
674 .1 += 1;
675 }
676 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
677 }
678 for project in &participant.projects {
679 project_hosts_and_guest_counts
680 .entry(project.id)
681 .or_default()
682 .0 = Some(participant.user.id);
683 }
684 }
685
686 if let Some(user) = self.user_store.read(cx).current_user() {
687 for project in &self.local_participant.projects {
688 project_hosts_and_guest_counts
689 .entry(project.id)
690 .or_default()
691 .0 = Some(user.id);
692 }
693 }
694
695 project_hosts_and_guest_counts
696 .into_iter()
697 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
698 .max_by_key(|(_, _, guest_count)| *guest_count)
699 .map(|(id, host, _)| (id, host))
700 }
701
702 async fn handle_room_updated(
703 this: Entity<Self>,
704 envelope: TypedEnvelope<proto::RoomUpdated>,
705 mut cx: AsyncApp,
706 ) -> Result<()> {
707 let room = envelope
708 .payload
709 .room
710 .ok_or_else(|| anyhow!("invalid room"))?;
711 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
712 }
713
714 fn apply_room_update(&mut self, mut room: proto::Room, cx: &mut Context<Self>) -> Result<()> {
715 // Filter ourselves out from the room's participants.
716 let local_participant_ix = room
717 .participants
718 .iter()
719 .position(|participant| Some(participant.user_id) == self.client.user_id());
720 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
721
722 let pending_participant_user_ids = room
723 .pending_participants
724 .iter()
725 .map(|p| p.user_id)
726 .collect::<Vec<_>>();
727
728 let remote_participant_user_ids = room
729 .participants
730 .iter()
731 .map(|p| p.user_id)
732 .collect::<Vec<_>>();
733
734 let (remote_participants, pending_participants) =
735 self.user_store.update(cx, move |user_store, cx| {
736 (
737 user_store.get_users(remote_participant_user_ids, cx),
738 user_store.get_users(pending_participant_user_ids, cx),
739 )
740 });
741
742 self.pending_room_update = Some(cx.spawn(async move |this, cx| {
743 let (remote_participants, pending_participants) =
744 futures::join!(remote_participants, pending_participants);
745
746 this.update(cx, |this, cx| {
747 this.participant_user_ids.clear();
748
749 if let Some(participant) = local_participant {
750 let role = participant.role();
751 this.local_participant.projects = participant.projects;
752 if this.local_participant.role != role {
753 this.local_participant.role = role;
754
755 if role == proto::ChannelRole::Guest {
756 for project in mem::take(&mut this.shared_projects) {
757 if let Some(project) = project.upgrade() {
758 this.unshare_project(project, cx).log_err();
759 }
760 }
761 this.local_participant.projects.clear();
762 if let Some(live_kit_room) = &mut this.live_kit {
763 live_kit_room.stop_publishing(cx);
764 }
765 }
766
767 this.joined_projects.retain(|project| {
768 if let Some(project) = project.upgrade() {
769 project.update(cx, |project, cx| project.set_role(role, cx));
770 true
771 } else {
772 false
773 }
774 });
775 }
776 } else {
777 this.local_participant.projects.clear();
778 }
779
780 if let Some(participants) = remote_participants.log_err() {
781 for (participant, user) in room.participants.into_iter().zip(participants) {
782 let Some(peer_id) = participant.peer_id else {
783 continue;
784 };
785 let participant_index = ParticipantIndex(participant.participant_index);
786 this.participant_user_ids.insert(participant.user_id);
787
788 let old_projects = this
789 .remote_participants
790 .get(&participant.user_id)
791 .into_iter()
792 .flat_map(|existing| &existing.projects)
793 .map(|project| project.id)
794 .collect::<HashSet<_>>();
795 let new_projects = participant
796 .projects
797 .iter()
798 .map(|project| project.id)
799 .collect::<HashSet<_>>();
800
801 for project in &participant.projects {
802 if !old_projects.contains(&project.id) {
803 cx.emit(Event::RemoteProjectShared {
804 owner: user.clone(),
805 project_id: project.id,
806 worktree_root_names: project.worktree_root_names.clone(),
807 });
808 }
809 }
810
811 for unshared_project_id in old_projects.difference(&new_projects) {
812 this.joined_projects.retain(|project| {
813 if let Some(project) = project.upgrade() {
814 project.update(cx, |project, cx| {
815 if project.remote_id() == Some(*unshared_project_id) {
816 project.disconnected_from_host(cx);
817 false
818 } else {
819 true
820 }
821 })
822 } else {
823 false
824 }
825 });
826 cx.emit(Event::RemoteProjectUnshared {
827 project_id: *unshared_project_id,
828 });
829 }
830
831 let role = participant.role();
832 let location = ParticipantLocation::from_proto(participant.location)
833 .unwrap_or(ParticipantLocation::External);
834 if let Some(remote_participant) =
835 this.remote_participants.get_mut(&participant.user_id)
836 {
837 remote_participant.peer_id = peer_id;
838 remote_participant.projects = participant.projects;
839 remote_participant.participant_index = participant_index;
840 if location != remote_participant.location
841 || role != remote_participant.role
842 {
843 remote_participant.location = location;
844 remote_participant.role = role;
845 cx.emit(Event::ParticipantLocationChanged {
846 participant_id: peer_id,
847 });
848 }
849 } else {
850 this.remote_participants.insert(
851 participant.user_id,
852 RemoteParticipant {
853 user: user.clone(),
854 participant_index,
855 peer_id,
856 projects: participant.projects,
857 location,
858 role,
859 muted: true,
860 speaking: false,
861 video_tracks: Default::default(),
862 audio_tracks: Default::default(),
863 },
864 );
865
866 Audio::play_sound(Sound::Joined, cx);
867
868 if let Some(live_kit) = this.live_kit.as_ref() {
869 let video_tracks =
870 live_kit.room.remote_video_tracks(&user.id.to_string());
871 let audio_tracks =
872 live_kit.room.remote_audio_tracks(&user.id.to_string());
873 let publications = live_kit
874 .room
875 .remote_audio_track_publications(&user.id.to_string());
876
877 for track in video_tracks {
878 this.live_kit_room_updated(
879 RoomUpdate::SubscribedToRemoteVideoTrack(track),
880 cx,
881 )
882 .log_err();
883 }
884
885 for (track, publication) in
886 audio_tracks.iter().zip(publications.iter())
887 {
888 this.live_kit_room_updated(
889 RoomUpdate::SubscribedToRemoteAudioTrack(
890 track.clone(),
891 publication.clone(),
892 ),
893 cx,
894 )
895 .log_err();
896 }
897 }
898 }
899 }
900
901 this.remote_participants.retain(|user_id, participant| {
902 if this.participant_user_ids.contains(user_id) {
903 true
904 } else {
905 for project in &participant.projects {
906 cx.emit(Event::RemoteProjectUnshared {
907 project_id: project.id,
908 });
909 }
910 false
911 }
912 });
913 }
914
915 if let Some(pending_participants) = pending_participants.log_err() {
916 this.pending_participants = pending_participants;
917 for participant in &this.pending_participants {
918 this.participant_user_ids.insert(participant.id);
919 }
920 }
921
922 this.follows_by_leader_id_project_id.clear();
923 for follower in room.followers {
924 let project_id = follower.project_id;
925 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
926 (Some(leader), Some(follower)) => (leader, follower),
927
928 _ => {
929 log::error!("Follower message {follower:?} missing some state");
930 continue;
931 }
932 };
933
934 let list = this
935 .follows_by_leader_id_project_id
936 .entry((leader, project_id))
937 .or_default();
938 if !list.contains(&follower) {
939 list.push(follower);
940 }
941 }
942
943 this.pending_room_update.take();
944 if this.should_leave() {
945 log::info!("room is empty, leaving");
946 this.leave(cx).detach();
947 }
948
949 this.user_store.update(cx, |user_store, cx| {
950 let participant_indices_by_user_id = this
951 .remote_participants
952 .iter()
953 .map(|(user_id, participant)| (*user_id, participant.participant_index))
954 .collect();
955 user_store.set_participant_indices(participant_indices_by_user_id, cx);
956 });
957
958 this.check_invariants();
959 this.room_update_completed_tx.try_send(Some(())).ok();
960 cx.notify();
961 })
962 .ok();
963 }));
964
965 cx.notify();
966 Ok(())
967 }
968
969 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
970 let mut done_rx = self.room_update_completed_rx.clone();
971 async move {
972 while let Some(result) = done_rx.next().await {
973 if result.is_some() {
974 break;
975 }
976 }
977 }
978 }
979
980 fn live_kit_room_updated(&mut self, update: RoomUpdate, cx: &mut Context<Self>) -> Result<()> {
981 match update {
982 RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
983 let user_id = track.publisher_id().parse()?;
984 let track_id = track.sid().to_string();
985 let participant = self
986 .remote_participants
987 .get_mut(&user_id)
988 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
989 participant.video_tracks.insert(track_id.clone(), track);
990 cx.emit(Event::RemoteVideoTracksChanged {
991 participant_id: participant.peer_id,
992 });
993 }
994
995 RoomUpdate::UnsubscribedFromRemoteVideoTrack {
996 publisher_id,
997 track_id,
998 } => {
999 let user_id = publisher_id.parse()?;
1000 let participant = self
1001 .remote_participants
1002 .get_mut(&user_id)
1003 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1004 participant.video_tracks.remove(&track_id);
1005 cx.emit(Event::RemoteVideoTracksChanged {
1006 participant_id: participant.peer_id,
1007 });
1008 }
1009
1010 RoomUpdate::ActiveSpeakersChanged { speakers } => {
1011 let mut speaker_ids = speakers
1012 .into_iter()
1013 .filter_map(|speaker_sid| speaker_sid.parse().ok())
1014 .collect::<Vec<u64>>();
1015 speaker_ids.sort_unstable();
1016 for (sid, participant) in &mut self.remote_participants {
1017 participant.speaking = speaker_ids.binary_search(sid).is_ok();
1018 }
1019 if let Some(id) = self.client.user_id() {
1020 if let Some(room) = &mut self.live_kit {
1021 room.speaking = speaker_ids.binary_search(&id).is_ok();
1022 }
1023 }
1024 }
1025
1026 RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
1027 let mut found = false;
1028 for participant in &mut self.remote_participants.values_mut() {
1029 for track in participant.audio_tracks.values() {
1030 if track.sid() == track_id {
1031 found = true;
1032 break;
1033 }
1034 }
1035 if found {
1036 participant.muted = muted;
1037 break;
1038 }
1039 }
1040 }
1041
1042 RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
1043 if let Some(live_kit) = &self.live_kit {
1044 if live_kit.deafened {
1045 track.stop();
1046 cx.foreground_executor()
1047 .spawn(publication.set_enabled(false))
1048 .detach();
1049 }
1050 }
1051
1052 let user_id = track.publisher_id().parse()?;
1053 let track_id = track.sid().to_string();
1054 let participant = self
1055 .remote_participants
1056 .get_mut(&user_id)
1057 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1058 participant.audio_tracks.insert(track_id.clone(), track);
1059 participant.muted = publication.is_muted();
1060
1061 cx.emit(Event::RemoteAudioTracksChanged {
1062 participant_id: participant.peer_id,
1063 });
1064 }
1065
1066 RoomUpdate::UnsubscribedFromRemoteAudioTrack {
1067 publisher_id,
1068 track_id,
1069 } => {
1070 let user_id = publisher_id.parse()?;
1071 let participant = self
1072 .remote_participants
1073 .get_mut(&user_id)
1074 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1075 participant.audio_tracks.remove(&track_id);
1076 cx.emit(Event::RemoteAudioTracksChanged {
1077 participant_id: participant.peer_id,
1078 });
1079 }
1080
1081 RoomUpdate::LocalAudioTrackUnpublished { publication } => {
1082 log::info!("unpublished audio track {}", publication.sid());
1083 if let Some(room) = &mut self.live_kit {
1084 room.microphone_track = LocalTrack::None;
1085 }
1086 }
1087
1088 RoomUpdate::LocalVideoTrackUnpublished { publication } => {
1089 log::info!("unpublished video track {}", publication.sid());
1090 if let Some(room) = &mut self.live_kit {
1091 room.screen_track = LocalTrack::None;
1092 }
1093 }
1094
1095 RoomUpdate::LocalAudioTrackPublished { publication } => {
1096 log::info!("published audio track {}", publication.sid());
1097 }
1098
1099 RoomUpdate::LocalVideoTrackPublished { publication } => {
1100 log::info!("published video track {}", publication.sid());
1101 }
1102 }
1103
1104 cx.notify();
1105 Ok(())
1106 }
1107
1108 fn check_invariants(&self) {
1109 #[cfg(any(test, feature = "test-support"))]
1110 {
1111 for participant in self.remote_participants.values() {
1112 assert!(self.participant_user_ids.contains(&participant.user.id));
1113 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1114 }
1115
1116 for participant in &self.pending_participants {
1117 assert!(self.participant_user_ids.contains(&participant.id));
1118 assert_ne!(participant.id, self.client.user_id().unwrap());
1119 }
1120
1121 assert_eq!(
1122 self.participant_user_ids.len(),
1123 self.remote_participants.len() + self.pending_participants.len()
1124 );
1125 }
1126 }
1127
1128 pub(crate) fn call(
1129 &mut self,
1130 called_user_id: u64,
1131 initial_project_id: Option<u64>,
1132 cx: &mut Context<Self>,
1133 ) -> Task<Result<()>> {
1134 if self.status.is_offline() {
1135 return Task::ready(Err(anyhow!("room is offline")));
1136 }
1137
1138 cx.notify();
1139 let client = self.client.clone();
1140 let room_id = self.id;
1141 self.pending_call_count += 1;
1142 cx.spawn(async move |this, cx| {
1143 let result = client
1144 .request(proto::Call {
1145 room_id,
1146 called_user_id,
1147 initial_project_id,
1148 })
1149 .await;
1150 this.update(cx, |this, cx| {
1151 this.pending_call_count -= 1;
1152 if this.should_leave() {
1153 this.leave(cx).detach_and_log_err(cx);
1154 }
1155 })?;
1156 result?;
1157 Ok(())
1158 })
1159 }
1160
1161 pub fn join_project(
1162 &mut self,
1163 id: u64,
1164 language_registry: Arc<LanguageRegistry>,
1165 fs: Arc<dyn Fs>,
1166 cx: &mut Context<Self>,
1167 ) -> Task<Result<Entity<Project>>> {
1168 let client = self.client.clone();
1169 let user_store = self.user_store.clone();
1170 cx.emit(Event::RemoteProjectJoined { project_id: id });
1171 cx.spawn(async move |this, cx| {
1172 let project =
1173 Project::in_room(id, client, user_store, language_registry, fs, cx.clone()).await?;
1174
1175 this.update(cx, |this, cx| {
1176 this.joined_projects.retain(|project| {
1177 if let Some(project) = project.upgrade() {
1178 !project.read(cx).is_disconnected(cx)
1179 } else {
1180 false
1181 }
1182 });
1183 this.joined_projects.insert(project.downgrade());
1184 })?;
1185 Ok(project)
1186 })
1187 }
1188
1189 pub fn share_project(
1190 &mut self,
1191 project: Entity<Project>,
1192 cx: &mut Context<Self>,
1193 ) -> Task<Result<u64>> {
1194 if let Some(project_id) = project.read(cx).remote_id() {
1195 return Task::ready(Ok(project_id));
1196 }
1197
1198 let request = self.client.request(proto::ShareProject {
1199 room_id: self.id(),
1200 worktrees: project.read(cx).worktree_metadata_protos(cx),
1201 is_ssh_project: project.read(cx).is_via_ssh(),
1202 });
1203
1204 cx.spawn(async move |this, cx| {
1205 let response = request.await?;
1206
1207 project.update(cx, |project, cx| project.shared(response.project_id, cx))??;
1208
1209 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1210 this.update(cx, |this, cx| {
1211 this.shared_projects.insert(project.downgrade());
1212 let active_project = this.local_participant.active_project.as_ref();
1213 if active_project.map_or(false, |location| *location == project) {
1214 this.set_location(Some(&project), cx)
1215 } else {
1216 Task::ready(Ok(()))
1217 }
1218 })?
1219 .await?;
1220
1221 Ok(response.project_id)
1222 })
1223 }
1224
1225 pub(crate) fn unshare_project(
1226 &mut self,
1227 project: Entity<Project>,
1228 cx: &mut Context<Self>,
1229 ) -> Result<()> {
1230 let project_id = match project.read(cx).remote_id() {
1231 Some(project_id) => project_id,
1232 None => return Ok(()),
1233 };
1234
1235 self.client.send(proto::UnshareProject { project_id })?;
1236 project.update(cx, |this, cx| this.unshare(cx))?;
1237
1238 if self.local_participant.active_project == Some(project.downgrade()) {
1239 self.set_location(Some(&project), cx).detach_and_log_err(cx);
1240 }
1241 Ok(())
1242 }
1243
1244 pub(crate) fn set_location(
1245 &mut self,
1246 project: Option<&Entity<Project>>,
1247 cx: &mut Context<Self>,
1248 ) -> Task<Result<()>> {
1249 if self.status.is_offline() {
1250 return Task::ready(Err(anyhow!("room is offline")));
1251 }
1252
1253 let client = self.client.clone();
1254 let room_id = self.id;
1255 let location = if let Some(project) = project {
1256 self.local_participant.active_project = Some(project.downgrade());
1257 if let Some(project_id) = project.read(cx).remote_id() {
1258 proto::participant_location::Variant::SharedProject(
1259 proto::participant_location::SharedProject { id: project_id },
1260 )
1261 } else {
1262 proto::participant_location::Variant::UnsharedProject(
1263 proto::participant_location::UnsharedProject {},
1264 )
1265 }
1266 } else {
1267 self.local_participant.active_project = None;
1268 proto::participant_location::Variant::External(proto::participant_location::External {})
1269 };
1270
1271 cx.notify();
1272 cx.background_spawn(async move {
1273 client
1274 .request(proto::UpdateParticipantLocation {
1275 room_id,
1276 location: Some(proto::ParticipantLocation {
1277 variant: Some(location),
1278 }),
1279 })
1280 .await?;
1281 Ok(())
1282 })
1283 }
1284
1285 pub fn is_screen_sharing(&self) -> bool {
1286 self.live_kit.as_ref().map_or(false, |live_kit| {
1287 !matches!(live_kit.screen_track, LocalTrack::None)
1288 })
1289 }
1290
1291 pub fn is_sharing_mic(&self) -> bool {
1292 self.live_kit.as_ref().map_or(false, |live_kit| {
1293 !matches!(live_kit.microphone_track, LocalTrack::None)
1294 })
1295 }
1296
1297 pub fn is_muted(&self) -> bool {
1298 self.live_kit.as_ref().map_or(false, |live_kit| {
1299 matches!(live_kit.microphone_track, LocalTrack::None)
1300 || live_kit.muted_by_user
1301 || live_kit.deafened
1302 })
1303 }
1304
1305 pub fn muted_by_user(&self) -> bool {
1306 self.live_kit
1307 .as_ref()
1308 .map_or(false, |live_kit| live_kit.muted_by_user)
1309 }
1310
1311 pub fn is_speaking(&self) -> bool {
1312 self.live_kit
1313 .as_ref()
1314 .map_or(false, |live_kit| live_kit.speaking)
1315 }
1316
1317 pub fn is_deafened(&self) -> Option<bool> {
1318 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1319 }
1320
1321 pub fn can_use_microphone(&self) -> bool {
1322 use proto::ChannelRole::*;
1323 match self.local_participant.role {
1324 Admin | Member | Talker => true,
1325 Guest | Banned => false,
1326 }
1327 }
1328
1329 pub fn can_share_projects(&self) -> bool {
1330 use proto::ChannelRole::*;
1331 match self.local_participant.role {
1332 Admin | Member => true,
1333 Guest | Banned | Talker => false,
1334 }
1335 }
1336
1337 #[track_caller]
1338 pub fn share_microphone(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1339 if self.status.is_offline() {
1340 return Task::ready(Err(anyhow!("room is offline")));
1341 }
1342
1343 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1344 let publish_id = post_inc(&mut live_kit.next_publish_id);
1345 live_kit.microphone_track = LocalTrack::Pending { publish_id };
1346 cx.notify();
1347 publish_id
1348 } else {
1349 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1350 };
1351
1352 cx.spawn(async move |this, cx| {
1353 let publish_track = async {
1354 let track = LocalAudioTrack::create();
1355 this.upgrade()
1356 .ok_or_else(|| anyhow!("room was dropped"))?
1357 .update(cx, |this, _| {
1358 this.live_kit
1359 .as_ref()
1360 .map(|live_kit| live_kit.room.publish_audio_track(track))
1361 })?
1362 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1363 .await
1364 };
1365 let publication = publish_track.await;
1366 this.upgrade()
1367 .ok_or_else(|| anyhow!("room was dropped"))?
1368 .update(cx, |this, cx| {
1369 let live_kit = this
1370 .live_kit
1371 .as_mut()
1372 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1373
1374 let canceled = if let LocalTrack::Pending {
1375 publish_id: cur_publish_id,
1376 } = &live_kit.microphone_track
1377 {
1378 *cur_publish_id != publish_id
1379 } else {
1380 true
1381 };
1382
1383 match publication {
1384 Ok(publication) => {
1385 if canceled {
1386 live_kit.room.unpublish_track(publication);
1387 } else {
1388 if live_kit.muted_by_user || live_kit.deafened {
1389 cx.background_spawn(publication.set_mute(true)).detach();
1390 }
1391 live_kit.microphone_track = LocalTrack::Published {
1392 track_publication: publication,
1393 };
1394 cx.notify();
1395 }
1396 Ok(())
1397 }
1398 Err(error) => {
1399 if canceled {
1400 Ok(())
1401 } else {
1402 live_kit.microphone_track = LocalTrack::None;
1403 cx.notify();
1404 Err(error)
1405 }
1406 }
1407 }
1408 })?
1409 })
1410 }
1411
1412 pub fn share_screen(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1413 if self.status.is_offline() {
1414 return Task::ready(Err(anyhow!("room is offline")));
1415 } else if self.is_screen_sharing() {
1416 return Task::ready(Err(anyhow!("screen was already shared")));
1417 }
1418
1419 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1420 let publish_id = post_inc(&mut live_kit.next_publish_id);
1421 live_kit.screen_track = LocalTrack::Pending { publish_id };
1422 cx.notify();
1423 (live_kit.room.display_sources(), publish_id)
1424 } else {
1425 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1426 };
1427
1428 cx.spawn(async move |this, cx| {
1429 let publish_track = async {
1430 let displays = displays.await?;
1431 let display = displays
1432 .first()
1433 .ok_or_else(|| anyhow!("no display found"))?;
1434 let track = LocalVideoTrack::screen_share_for_display(display);
1435 this.upgrade()
1436 .ok_or_else(|| anyhow!("room was dropped"))?
1437 .update(cx, |this, _| {
1438 this.live_kit
1439 .as_ref()
1440 .map(|live_kit| live_kit.room.publish_video_track(track))
1441 })?
1442 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1443 .await
1444 };
1445
1446 let publication = publish_track.await;
1447 this.upgrade()
1448 .ok_or_else(|| anyhow!("room was dropped"))?
1449 .update(cx, |this, cx| {
1450 let live_kit = this
1451 .live_kit
1452 .as_mut()
1453 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1454
1455 let canceled = if let LocalTrack::Pending {
1456 publish_id: cur_publish_id,
1457 } = &live_kit.screen_track
1458 {
1459 *cur_publish_id != publish_id
1460 } else {
1461 true
1462 };
1463
1464 match publication {
1465 Ok(publication) => {
1466 if canceled {
1467 live_kit.room.unpublish_track(publication);
1468 } else {
1469 live_kit.screen_track = LocalTrack::Published {
1470 track_publication: publication,
1471 };
1472 cx.notify();
1473 }
1474
1475 Audio::play_sound(Sound::StartScreenshare, cx);
1476
1477 Ok(())
1478 }
1479 Err(error) => {
1480 if canceled {
1481 Ok(())
1482 } else {
1483 live_kit.screen_track = LocalTrack::None;
1484 cx.notify();
1485 Err(error)
1486 }
1487 }
1488 }
1489 })?
1490 })
1491 }
1492
1493 pub fn toggle_mute(&mut self, cx: &mut Context<Self>) {
1494 if let Some(live_kit) = self.live_kit.as_mut() {
1495 // When unmuting, undeafen if the user was deafened before.
1496 let was_deafened = live_kit.deafened;
1497 if live_kit.muted_by_user
1498 || live_kit.deafened
1499 || matches!(live_kit.microphone_track, LocalTrack::None)
1500 {
1501 live_kit.muted_by_user = false;
1502 live_kit.deafened = false;
1503 } else {
1504 live_kit.muted_by_user = true;
1505 }
1506 let muted = live_kit.muted_by_user;
1507 let should_undeafen = was_deafened && !live_kit.deafened;
1508
1509 if let Some(task) = self.set_mute(muted, cx) {
1510 task.detach_and_log_err(cx);
1511 }
1512
1513 if should_undeafen {
1514 if let Some(task) = self.set_deafened(false, cx) {
1515 task.detach_and_log_err(cx);
1516 }
1517 }
1518 }
1519 }
1520
1521 pub fn toggle_deafen(&mut self, cx: &mut Context<Self>) {
1522 if let Some(live_kit) = self.live_kit.as_mut() {
1523 // When deafening, mute the microphone if it was not already muted.
1524 // When un-deafening, unmute the microphone, unless it was explicitly muted.
1525 let deafened = !live_kit.deafened;
1526 live_kit.deafened = deafened;
1527 let should_change_mute = !live_kit.muted_by_user;
1528
1529 if let Some(task) = self.set_deafened(deafened, cx) {
1530 task.detach_and_log_err(cx);
1531 }
1532
1533 if should_change_mute {
1534 if let Some(task) = self.set_mute(deafened, cx) {
1535 task.detach_and_log_err(cx);
1536 }
1537 }
1538 }
1539 }
1540
1541 pub fn unshare_screen(&mut self, cx: &mut Context<Self>) -> Result<()> {
1542 if self.status.is_offline() {
1543 return Err(anyhow!("room is offline"));
1544 }
1545
1546 let live_kit = self
1547 .live_kit
1548 .as_mut()
1549 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1550 match mem::take(&mut live_kit.screen_track) {
1551 LocalTrack::None => Err(anyhow!("screen was not shared")),
1552 LocalTrack::Pending { .. } => {
1553 cx.notify();
1554 Ok(())
1555 }
1556 LocalTrack::Published {
1557 track_publication, ..
1558 } => {
1559 live_kit.room.unpublish_track(track_publication);
1560 cx.notify();
1561
1562 Audio::play_sound(Sound::StopScreenshare, cx);
1563 Ok(())
1564 }
1565 }
1566 }
1567
1568 fn set_deafened(&mut self, deafened: bool, cx: &mut Context<Self>) -> Option<Task<Result<()>>> {
1569 let live_kit = self.live_kit.as_mut()?;
1570 cx.notify();
1571
1572 let mut track_updates = Vec::new();
1573 for participant in self.remote_participants.values() {
1574 for publication in live_kit
1575 .room
1576 .remote_audio_track_publications(&participant.user.id.to_string())
1577 {
1578 track_updates.push(publication.set_enabled(!deafened));
1579 }
1580
1581 for track in participant.audio_tracks.values() {
1582 if deafened {
1583 track.stop();
1584 } else {
1585 track.start();
1586 }
1587 }
1588 }
1589
1590 Some(cx.foreground_executor().spawn(async move {
1591 for result in futures::future::join_all(track_updates).await {
1592 result?;
1593 }
1594 Ok(())
1595 }))
1596 }
1597
1598 fn set_mute(&mut self, should_mute: bool, cx: &mut Context<Room>) -> Option<Task<Result<()>>> {
1599 let live_kit = self.live_kit.as_mut()?;
1600 cx.notify();
1601
1602 if should_mute {
1603 Audio::play_sound(Sound::Mute, cx);
1604 } else {
1605 Audio::play_sound(Sound::Unmute, cx);
1606 }
1607
1608 match &mut live_kit.microphone_track {
1609 LocalTrack::None => {
1610 if should_mute {
1611 None
1612 } else {
1613 Some(self.share_microphone(cx))
1614 }
1615 }
1616 LocalTrack::Pending { .. } => None,
1617 LocalTrack::Published { track_publication } => Some(
1618 cx.foreground_executor()
1619 .spawn(track_publication.set_mute(should_mute)),
1620 ),
1621 }
1622 }
1623
1624 #[cfg(any(test, feature = "test-support"))]
1625 pub fn set_display_sources(&self, sources: Vec<livekit_client_macos::MacOSDisplay>) {
1626 self.live_kit
1627 .as_ref()
1628 .unwrap()
1629 .room
1630 .set_display_sources(sources);
1631 }
1632}
1633
1634struct LiveKitRoom {
1635 room: Arc<livekit_client_macos::Room>,
1636 screen_track: LocalTrack,
1637 microphone_track: LocalTrack,
1638 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1639 muted_by_user: bool,
1640 deafened: bool,
1641 speaking: bool,
1642 next_publish_id: usize,
1643 _maintain_room: Task<()>,
1644 _handle_updates: Task<()>,
1645}
1646
1647impl LiveKitRoom {
1648 fn stop_publishing(&mut self, cx: &mut Context<Room>) {
1649 if let LocalTrack::Published {
1650 track_publication, ..
1651 } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1652 {
1653 self.room.unpublish_track(track_publication);
1654 cx.notify();
1655 }
1656
1657 if let LocalTrack::Published {
1658 track_publication, ..
1659 } = mem::replace(&mut self.screen_track, LocalTrack::None)
1660 {
1661 self.room.unpublish_track(track_publication);
1662 cx.notify();
1663 }
1664 }
1665}
1666
1667enum LocalTrack {
1668 None,
1669 Pending {
1670 publish_id: usize,
1671 },
1672 Published {
1673 track_publication: LocalTrackPublication,
1674 },
1675}
1676
1677impl Default for LocalTrack {
1678 fn default() -> Self {
1679 Self::None
1680 }
1681}
1682
1683#[derive(Copy, Clone, PartialEq, Eq)]
1684pub enum RoomStatus {
1685 Online,
1686 Rejoining,
1687 Offline,
1688}
1689
1690impl RoomStatus {
1691 pub fn is_offline(&self) -> bool {
1692 matches!(self, RoomStatus::Offline)
1693 }
1694
1695 pub fn is_online(&self) -> bool {
1696 matches!(self, RoomStatus::Online)
1697 }
1698}