1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
4};
5use anyhow::{anyhow, Result};
6use audio::{Audio, Sound};
7use client::{
8 proto::{self, PeerId},
9 ChannelId, Client, ParticipantIndex, TypedEnvelope, User, UserStore,
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use fs::Fs;
13use futures::{FutureExt, StreamExt};
14use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
15use language::LanguageRegistry;
16use livekit_client_macos::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
17use postage::{sink::Sink, stream::Stream, watch};
18use project::Project;
19use settings::Settings as _;
20use std::{future::Future, mem, sync::Arc, time::Duration};
21use util::{post_inc, ResultExt, TryFutureExt};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub enum Event {
27 RoomJoined {
28 channel_id: Option<ChannelId>,
29 },
30 ParticipantLocationChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteVideoTracksChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteAudioTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteProjectShared {
40 owner: Arc<User>,
41 project_id: u64,
42 worktree_root_names: Vec<String>,
43 },
44 RemoteProjectUnshared {
45 project_id: u64,
46 },
47 RemoteProjectJoined {
48 project_id: u64,
49 },
50 RemoteProjectInvitationDiscarded {
51 project_id: u64,
52 },
53 RoomLeft {
54 channel_id: Option<ChannelId>,
55 },
56}
57
58pub struct Room {
59 id: u64,
60 channel_id: Option<ChannelId>,
61 live_kit: Option<LiveKitRoom>,
62 status: RoomStatus,
63 shared_projects: HashSet<WeakEntity<Project>>,
64 joined_projects: HashSet<WeakEntity<Project>>,
65 local_participant: LocalParticipant,
66 remote_participants: BTreeMap<u64, RemoteParticipant>,
67 pending_participants: Vec<Arc<User>>,
68 participant_user_ids: HashSet<u64>,
69 pending_call_count: usize,
70 leave_when_empty: bool,
71 client: Arc<Client>,
72 user_store: Entity<UserStore>,
73 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
74 client_subscriptions: Vec<client::Subscription>,
75 _subscriptions: Vec<gpui::Subscription>,
76 room_update_completed_tx: watch::Sender<Option<()>>,
77 room_update_completed_rx: watch::Receiver<Option<()>>,
78 pending_room_update: Option<Task<()>>,
79 maintain_connection: Option<Task<Option<()>>>,
80}
81
82impl EventEmitter<Event> for Room {}
83
84impl Room {
85 pub fn channel_id(&self) -> Option<ChannelId> {
86 self.channel_id
87 }
88
89 pub fn is_sharing_project(&self) -> bool {
90 !self.shared_projects.is_empty()
91 }
92
93 #[cfg(any(test, feature = "test-support"))]
94 pub fn is_connected(&self) -> bool {
95 if let Some(live_kit) = self.live_kit.as_ref() {
96 matches!(
97 *live_kit.room.status().borrow(),
98 livekit_client_macos::ConnectionState::Connected { .. }
99 )
100 } else {
101 false
102 }
103 }
104
105 fn new(
106 id: u64,
107 channel_id: Option<ChannelId>,
108 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
109 client: Arc<Client>,
110 user_store: Entity<UserStore>,
111 cx: &mut Context<Self>,
112 ) -> Self {
113 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
114 let room = livekit_client_macos::Room::new();
115 let mut status = room.status();
116 // Consume the initial status of the room.
117 let _ = status.try_recv();
118 let _maintain_room = cx.spawn(async move |this, cx| {
119 while let Some(status) = status.next().await {
120 let this = if let Some(this) = this.upgrade() {
121 this
122 } else {
123 break;
124 };
125
126 if status == livekit_client_macos::ConnectionState::Disconnected {
127 this.update(cx, |this, cx| this.leave(cx).log_err()).ok();
128 break;
129 }
130 }
131 });
132
133 let _handle_updates = cx.spawn({
134 let room = room.clone();
135 async move |this, cx| {
136 let mut updates = room.updates();
137 while let Some(update) = updates.next().await {
138 let this = if let Some(this) = this.upgrade() {
139 this
140 } else {
141 break;
142 };
143
144 this.update(cx, |this, cx| {
145 this.live_kit_room_updated(update, cx).log_err()
146 })
147 .ok();
148 }
149 }
150 });
151
152 let connect = room.connect(&connection_info.server_url, &connection_info.token);
153 cx.spawn(async move |this, cx| {
154 connect.await?;
155 this.update(cx, |this, cx| {
156 if this.can_use_microphone() {
157 if let Some(live_kit) = &this.live_kit {
158 if !live_kit.muted_by_user && !live_kit.deafened {
159 return this.share_microphone(cx);
160 }
161 }
162 }
163 Task::ready(Ok(()))
164 })?
165 .await
166 })
167 .detach_and_log_err(cx);
168
169 Some(LiveKitRoom {
170 room,
171 screen_track: LocalTrack::None,
172 microphone_track: LocalTrack::None,
173 next_publish_id: 0,
174 muted_by_user: Self::mute_on_join(cx),
175 deafened: false,
176 speaking: false,
177 _maintain_room,
178 _handle_updates,
179 })
180 } else {
181 None
182 };
183
184 let maintain_connection = cx.spawn({
185 let client = client.clone();
186 async move |this, cx| {
187 Self::maintain_connection(this, client.clone(), cx)
188 .log_err()
189 .await
190 }
191 });
192
193 Audio::play_sound(Sound::Joined, cx);
194
195 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
196
197 Self {
198 id,
199 channel_id,
200 live_kit: live_kit_room,
201 status: RoomStatus::Online,
202 shared_projects: Default::default(),
203 joined_projects: Default::default(),
204 participant_user_ids: Default::default(),
205 local_participant: Default::default(),
206 remote_participants: Default::default(),
207 pending_participants: Default::default(),
208 pending_call_count: 0,
209 client_subscriptions: vec![
210 client.add_message_handler(cx.weak_entity(), Self::handle_room_updated)
211 ],
212 _subscriptions: vec![
213 cx.on_release(Self::released),
214 cx.on_app_quit(Self::app_will_quit),
215 ],
216 leave_when_empty: false,
217 pending_room_update: None,
218 client,
219 user_store,
220 follows_by_leader_id_project_id: Default::default(),
221 maintain_connection: Some(maintain_connection),
222 room_update_completed_tx,
223 room_update_completed_rx,
224 }
225 }
226
227 pub(crate) fn create(
228 called_user_id: u64,
229 initial_project: Option<Entity<Project>>,
230 client: Arc<Client>,
231 user_store: Entity<UserStore>,
232 cx: &mut App,
233 ) -> Task<Result<Entity<Self>>> {
234 cx.spawn(async move |cx| {
235 let response = client.request(proto::CreateRoom {}).await?;
236 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
237 let room = cx.new(|cx| {
238 let mut room = Self::new(
239 room_proto.id,
240 None,
241 response.live_kit_connection_info,
242 client,
243 user_store,
244 cx,
245 );
246 if let Some(participant) = room_proto.participants.first() {
247 room.local_participant.role = participant.role()
248 }
249 room
250 })?;
251
252 let initial_project_id = if let Some(initial_project) = initial_project {
253 let initial_project_id = room
254 .update(cx, |room, cx| {
255 room.share_project(initial_project.clone(), cx)
256 })?
257 .await?;
258 Some(initial_project_id)
259 } else {
260 None
261 };
262
263 let did_join = room
264 .update(cx, |room, cx| {
265 room.leave_when_empty = true;
266 room.call(called_user_id, initial_project_id, cx)
267 })?
268 .await;
269 match did_join {
270 Ok(()) => Ok(room),
271 Err(error) => Err(error.context("room creation failed")),
272 }
273 })
274 }
275
276 pub(crate) async fn join_channel(
277 channel_id: ChannelId,
278 client: Arc<Client>,
279 user_store: Entity<UserStore>,
280 cx: &mut AsyncApp,
281 ) -> Result<Entity<Self>> {
282 Self::from_join_response(
283 client
284 .request(proto::JoinChannel {
285 channel_id: channel_id.0,
286 })
287 .await?,
288 client,
289 user_store,
290 cx,
291 )
292 }
293
294 pub(crate) async fn join(
295 room_id: u64,
296 client: Arc<Client>,
297 user_store: Entity<UserStore>,
298 cx: &mut AsyncApp,
299 ) -> Result<Entity<Self>> {
300 Self::from_join_response(
301 client.request(proto::JoinRoom { id: room_id }).await?,
302 client,
303 user_store,
304 cx,
305 )
306 }
307
308 fn released(&mut self, cx: &mut App) {
309 if self.status.is_online() {
310 self.leave_internal(cx).detach_and_log_err(cx);
311 }
312 }
313
314 fn app_will_quit(&mut self, cx: &mut Context<Self>) -> impl Future<Output = ()> {
315 let task = if self.status.is_online() {
316 let leave = self.leave_internal(cx);
317 Some(cx.background_spawn(async move {
318 leave.await.log_err();
319 }))
320 } else {
321 None
322 };
323
324 async move {
325 if let Some(task) = task {
326 task.await;
327 }
328 }
329 }
330
331 pub fn mute_on_join(cx: &App) -> bool {
332 CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
333 }
334
335 fn from_join_response(
336 response: proto::JoinRoomResponse,
337 client: Arc<Client>,
338 user_store: Entity<UserStore>,
339 cx: &mut AsyncApp,
340 ) -> Result<Entity<Self>> {
341 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
342 let room = cx.new(|cx| {
343 Self::new(
344 room_proto.id,
345 response.channel_id.map(ChannelId),
346 response.live_kit_connection_info,
347 client,
348 user_store,
349 cx,
350 )
351 })?;
352 room.update(cx, |room, cx| {
353 room.leave_when_empty = room.channel_id.is_none();
354 room.apply_room_update(room_proto, cx)?;
355 anyhow::Ok(())
356 })??;
357 Ok(room)
358 }
359
360 fn should_leave(&self) -> bool {
361 self.leave_when_empty
362 && self.pending_room_update.is_none()
363 && self.pending_participants.is_empty()
364 && self.remote_participants.is_empty()
365 && self.pending_call_count == 0
366 }
367
368 pub(crate) fn leave(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
369 cx.notify();
370 self.leave_internal(cx)
371 }
372
373 fn leave_internal(&mut self, cx: &mut App) -> Task<Result<()>> {
374 if self.status.is_offline() {
375 return Task::ready(Err(anyhow!("room is offline")));
376 }
377
378 log::info!("leaving room");
379 Audio::play_sound(Sound::Leave, cx);
380
381 self.clear_state(cx);
382
383 let leave_room = self.client.request(proto::LeaveRoom {});
384 cx.background_spawn(async move {
385 leave_room.await?;
386 anyhow::Ok(())
387 })
388 }
389
390 pub(crate) fn clear_state(&mut self, cx: &mut App) {
391 for project in self.shared_projects.drain() {
392 if let Some(project) = project.upgrade() {
393 project.update(cx, |project, cx| {
394 project.unshare(cx).log_err();
395 });
396 }
397 }
398 for project in self.joined_projects.drain() {
399 if let Some(project) = project.upgrade() {
400 project.update(cx, |project, cx| {
401 project.disconnected_from_host(cx);
402 project.close(cx);
403 });
404 }
405 }
406
407 self.status = RoomStatus::Offline;
408 self.remote_participants.clear();
409 self.pending_participants.clear();
410 self.participant_user_ids.clear();
411 self.client_subscriptions.clear();
412 self.live_kit.take();
413 self.pending_room_update.take();
414 self.maintain_connection.take();
415 }
416
417 async fn maintain_connection(
418 this: WeakEntity<Self>,
419 client: Arc<Client>,
420 cx: &mut AsyncApp,
421 ) -> Result<()> {
422 let mut client_status = client.status();
423 loop {
424 let _ = client_status.try_recv();
425 let is_connected = client_status.borrow().is_connected();
426 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
427 if !is_connected || client_status.next().await.is_some() {
428 log::info!("detected client disconnection");
429
430 this.upgrade()
431 .ok_or_else(|| anyhow!("room was dropped"))?
432 .update(cx, |this, cx| {
433 this.status = RoomStatus::Rejoining;
434 cx.notify();
435 })?;
436
437 // Wait for client to re-establish a connection to the server.
438 {
439 let mut reconnection_timeout =
440 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
441 let client_reconnection = async {
442 let mut remaining_attempts = 3;
443 while remaining_attempts > 0 {
444 if client_status.borrow().is_connected() {
445 log::info!("client reconnected, attempting to rejoin room");
446
447 let Some(this) = this.upgrade() else { break };
448 match this.update(cx, |this, cx| this.rejoin(cx)) {
449 Ok(task) => {
450 if task.await.log_err().is_some() {
451 return true;
452 } else {
453 remaining_attempts -= 1;
454 }
455 }
456 Err(_app_dropped) => return false,
457 }
458 } else if client_status.borrow().is_signed_out() {
459 return false;
460 }
461
462 log::info!(
463 "waiting for client status change, remaining attempts {}",
464 remaining_attempts
465 );
466 client_status.next().await;
467 }
468 false
469 }
470 .fuse();
471 futures::pin_mut!(client_reconnection);
472
473 futures::select_biased! {
474 reconnected = client_reconnection => {
475 if reconnected {
476 log::info!("successfully reconnected to room");
477 // If we successfully joined the room, go back around the loop
478 // waiting for future connection status changes.
479 continue;
480 }
481 }
482 _ = reconnection_timeout => {
483 log::info!("room reconnection timeout expired");
484 }
485 }
486 }
487
488 break;
489 }
490 }
491
492 // The client failed to re-establish a connection to the server
493 // or an error occurred while trying to re-join the room. Either way
494 // we leave the room and return an error.
495 if let Some(this) = this.upgrade() {
496 log::info!("reconnection failed, leaving room");
497 this.update(cx, |this, cx| this.leave(cx))?.await?;
498 }
499 Err(anyhow!(
500 "can't reconnect to room: client failed to re-establish connection"
501 ))
502 }
503
504 fn rejoin(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
505 let mut projects = HashMap::default();
506 let mut reshared_projects = Vec::new();
507 let mut rejoined_projects = Vec::new();
508 self.shared_projects.retain(|project| {
509 if let Some(handle) = project.upgrade() {
510 let project = handle.read(cx);
511 if let Some(project_id) = project.remote_id() {
512 projects.insert(project_id, handle.clone());
513 reshared_projects.push(proto::UpdateProject {
514 project_id,
515 worktrees: project.worktree_metadata_protos(cx),
516 });
517 return true;
518 }
519 }
520 false
521 });
522 self.joined_projects.retain(|project| {
523 if let Some(handle) = project.upgrade() {
524 let project = handle.read(cx);
525 if let Some(project_id) = project.remote_id() {
526 projects.insert(project_id, handle.clone());
527 let mut worktrees = Vec::new();
528 let mut repositories = Vec::new();
529 for worktree in project.worktrees(cx) {
530 let worktree = worktree.read(cx);
531 worktrees.push(proto::RejoinWorktree {
532 id: worktree.id().to_proto(),
533 scan_id: worktree.completed_scan_id() as u64,
534 });
535 }
536 for (entry_id, repository) in project.repositories(cx) {
537 let repository = repository.read(cx);
538 repositories.push(proto::RejoinRepository {
539 id: entry_id.to_proto(),
540 scan_id: repository.completed_scan_id as u64,
541 });
542 }
543
544 rejoined_projects.push(proto::RejoinProject {
545 id: project_id,
546 worktrees,
547 repositories,
548 });
549 }
550 return true;
551 }
552 false
553 });
554
555 let response = self.client.request_envelope(proto::RejoinRoom {
556 id: self.id,
557 reshared_projects,
558 rejoined_projects,
559 });
560
561 cx.spawn(async move |this, cx| {
562 let response = response.await?;
563 let message_id = response.message_id;
564 let response = response.payload;
565 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
566 this.update(cx, |this, cx| {
567 this.status = RoomStatus::Online;
568 this.apply_room_update(room_proto, cx)?;
569
570 for reshared_project in response.reshared_projects {
571 if let Some(project) = projects.get(&reshared_project.id) {
572 project.update(cx, |project, cx| {
573 project.reshared(reshared_project, cx).log_err();
574 });
575 }
576 }
577
578 for rejoined_project in response.rejoined_projects {
579 if let Some(project) = projects.get(&rejoined_project.id) {
580 project.update(cx, |project, cx| {
581 project.rejoined(rejoined_project, message_id, cx).log_err();
582 });
583 }
584 }
585
586 anyhow::Ok(())
587 })?
588 })
589 }
590
591 pub fn id(&self) -> u64 {
592 self.id
593 }
594
595 pub fn status(&self) -> RoomStatus {
596 self.status
597 }
598
599 pub fn local_participant(&self) -> &LocalParticipant {
600 &self.local_participant
601 }
602
603 pub fn local_participant_user(&self, cx: &App) -> Option<Arc<User>> {
604 self.user_store.read(cx).current_user()
605 }
606
607 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
608 &self.remote_participants
609 }
610
611 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
612 self.remote_participants
613 .values()
614 .find(|p| p.peer_id == peer_id)
615 }
616
617 pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
618 self.remote_participants
619 .get(&user_id)
620 .map(|participant| participant.role)
621 }
622
623 pub fn contains_guests(&self) -> bool {
624 self.local_participant.role == proto::ChannelRole::Guest
625 || self
626 .remote_participants
627 .values()
628 .any(|p| p.role == proto::ChannelRole::Guest)
629 }
630
631 pub fn local_participant_is_admin(&self) -> bool {
632 self.local_participant.role == proto::ChannelRole::Admin
633 }
634
635 pub fn local_participant_is_guest(&self) -> bool {
636 self.local_participant.role == proto::ChannelRole::Guest
637 }
638
639 pub fn set_participant_role(
640 &mut self,
641 user_id: u64,
642 role: proto::ChannelRole,
643 cx: &Context<Self>,
644 ) -> Task<Result<()>> {
645 let client = self.client.clone();
646 let room_id = self.id;
647 let role = role.into();
648 cx.spawn(async move |_, _| {
649 client
650 .request(proto::SetRoomParticipantRole {
651 room_id,
652 user_id,
653 role,
654 })
655 .await
656 .map(|_| ())
657 })
658 }
659
660 pub fn pending_participants(&self) -> &[Arc<User>] {
661 &self.pending_participants
662 }
663
664 pub fn contains_participant(&self, user_id: u64) -> bool {
665 self.participant_user_ids.contains(&user_id)
666 }
667
668 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
669 self.follows_by_leader_id_project_id
670 .get(&(leader_id, project_id))
671 .map_or(&[], |v| v.as_slice())
672 }
673
674 /// Returns the most 'active' projects, defined as most people in the project
675 pub fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
676 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
677 for participant in self.remote_participants.values() {
678 match participant.location {
679 ParticipantLocation::SharedProject { project_id } => {
680 project_hosts_and_guest_counts
681 .entry(project_id)
682 .or_default()
683 .1 += 1;
684 }
685 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
686 }
687 for project in &participant.projects {
688 project_hosts_and_guest_counts
689 .entry(project.id)
690 .or_default()
691 .0 = Some(participant.user.id);
692 }
693 }
694
695 if let Some(user) = self.user_store.read(cx).current_user() {
696 for project in &self.local_participant.projects {
697 project_hosts_and_guest_counts
698 .entry(project.id)
699 .or_default()
700 .0 = Some(user.id);
701 }
702 }
703
704 project_hosts_and_guest_counts
705 .into_iter()
706 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
707 .max_by_key(|(_, _, guest_count)| *guest_count)
708 .map(|(id, host, _)| (id, host))
709 }
710
711 async fn handle_room_updated(
712 this: Entity<Self>,
713 envelope: TypedEnvelope<proto::RoomUpdated>,
714 mut cx: AsyncApp,
715 ) -> Result<()> {
716 let room = envelope
717 .payload
718 .room
719 .ok_or_else(|| anyhow!("invalid room"))?;
720 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
721 }
722
723 fn apply_room_update(&mut self, mut room: proto::Room, cx: &mut Context<Self>) -> Result<()> {
724 // Filter ourselves out from the room's participants.
725 let local_participant_ix = room
726 .participants
727 .iter()
728 .position(|participant| Some(participant.user_id) == self.client.user_id());
729 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
730
731 let pending_participant_user_ids = room
732 .pending_participants
733 .iter()
734 .map(|p| p.user_id)
735 .collect::<Vec<_>>();
736
737 let remote_participant_user_ids = room
738 .participants
739 .iter()
740 .map(|p| p.user_id)
741 .collect::<Vec<_>>();
742
743 let (remote_participants, pending_participants) =
744 self.user_store.update(cx, move |user_store, cx| {
745 (
746 user_store.get_users(remote_participant_user_ids, cx),
747 user_store.get_users(pending_participant_user_ids, cx),
748 )
749 });
750
751 self.pending_room_update = Some(cx.spawn(async move |this, cx| {
752 let (remote_participants, pending_participants) =
753 futures::join!(remote_participants, pending_participants);
754
755 this.update(cx, |this, cx| {
756 this.participant_user_ids.clear();
757
758 if let Some(participant) = local_participant {
759 let role = participant.role();
760 this.local_participant.projects = participant.projects;
761 if this.local_participant.role != role {
762 this.local_participant.role = role;
763
764 if role == proto::ChannelRole::Guest {
765 for project in mem::take(&mut this.shared_projects) {
766 if let Some(project) = project.upgrade() {
767 this.unshare_project(project, cx).log_err();
768 }
769 }
770 this.local_participant.projects.clear();
771 if let Some(live_kit_room) = &mut this.live_kit {
772 live_kit_room.stop_publishing(cx);
773 }
774 }
775
776 this.joined_projects.retain(|project| {
777 if let Some(project) = project.upgrade() {
778 project.update(cx, |project, cx| project.set_role(role, cx));
779 true
780 } else {
781 false
782 }
783 });
784 }
785 } else {
786 this.local_participant.projects.clear();
787 }
788
789 if let Some(participants) = remote_participants.log_err() {
790 for (participant, user) in room.participants.into_iter().zip(participants) {
791 let Some(peer_id) = participant.peer_id else {
792 continue;
793 };
794 let participant_index = ParticipantIndex(participant.participant_index);
795 this.participant_user_ids.insert(participant.user_id);
796
797 let old_projects = this
798 .remote_participants
799 .get(&participant.user_id)
800 .into_iter()
801 .flat_map(|existing| &existing.projects)
802 .map(|project| project.id)
803 .collect::<HashSet<_>>();
804 let new_projects = participant
805 .projects
806 .iter()
807 .map(|project| project.id)
808 .collect::<HashSet<_>>();
809
810 for project in &participant.projects {
811 if !old_projects.contains(&project.id) {
812 cx.emit(Event::RemoteProjectShared {
813 owner: user.clone(),
814 project_id: project.id,
815 worktree_root_names: project.worktree_root_names.clone(),
816 });
817 }
818 }
819
820 for unshared_project_id in old_projects.difference(&new_projects) {
821 this.joined_projects.retain(|project| {
822 if let Some(project) = project.upgrade() {
823 project.update(cx, |project, cx| {
824 if project.remote_id() == Some(*unshared_project_id) {
825 project.disconnected_from_host(cx);
826 false
827 } else {
828 true
829 }
830 })
831 } else {
832 false
833 }
834 });
835 cx.emit(Event::RemoteProjectUnshared {
836 project_id: *unshared_project_id,
837 });
838 }
839
840 let role = participant.role();
841 let location = ParticipantLocation::from_proto(participant.location)
842 .unwrap_or(ParticipantLocation::External);
843 if let Some(remote_participant) =
844 this.remote_participants.get_mut(&participant.user_id)
845 {
846 remote_participant.peer_id = peer_id;
847 remote_participant.projects = participant.projects;
848 remote_participant.participant_index = participant_index;
849 if location != remote_participant.location
850 || role != remote_participant.role
851 {
852 remote_participant.location = location;
853 remote_participant.role = role;
854 cx.emit(Event::ParticipantLocationChanged {
855 participant_id: peer_id,
856 });
857 }
858 } else {
859 this.remote_participants.insert(
860 participant.user_id,
861 RemoteParticipant {
862 user: user.clone(),
863 participant_index,
864 peer_id,
865 projects: participant.projects,
866 location,
867 role,
868 muted: true,
869 speaking: false,
870 video_tracks: Default::default(),
871 audio_tracks: Default::default(),
872 },
873 );
874
875 Audio::play_sound(Sound::Joined, cx);
876
877 if let Some(live_kit) = this.live_kit.as_ref() {
878 let video_tracks =
879 live_kit.room.remote_video_tracks(&user.id.to_string());
880 let audio_tracks =
881 live_kit.room.remote_audio_tracks(&user.id.to_string());
882 let publications = live_kit
883 .room
884 .remote_audio_track_publications(&user.id.to_string());
885
886 for track in video_tracks {
887 this.live_kit_room_updated(
888 RoomUpdate::SubscribedToRemoteVideoTrack(track),
889 cx,
890 )
891 .log_err();
892 }
893
894 for (track, publication) in
895 audio_tracks.iter().zip(publications.iter())
896 {
897 this.live_kit_room_updated(
898 RoomUpdate::SubscribedToRemoteAudioTrack(
899 track.clone(),
900 publication.clone(),
901 ),
902 cx,
903 )
904 .log_err();
905 }
906 }
907 }
908 }
909
910 this.remote_participants.retain(|user_id, participant| {
911 if this.participant_user_ids.contains(user_id) {
912 true
913 } else {
914 for project in &participant.projects {
915 cx.emit(Event::RemoteProjectUnshared {
916 project_id: project.id,
917 });
918 }
919 false
920 }
921 });
922 }
923
924 if let Some(pending_participants) = pending_participants.log_err() {
925 this.pending_participants = pending_participants;
926 for participant in &this.pending_participants {
927 this.participant_user_ids.insert(participant.id);
928 }
929 }
930
931 this.follows_by_leader_id_project_id.clear();
932 for follower in room.followers {
933 let project_id = follower.project_id;
934 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
935 (Some(leader), Some(follower)) => (leader, follower),
936
937 _ => {
938 log::error!("Follower message {follower:?} missing some state");
939 continue;
940 }
941 };
942
943 let list = this
944 .follows_by_leader_id_project_id
945 .entry((leader, project_id))
946 .or_default();
947 if !list.contains(&follower) {
948 list.push(follower);
949 }
950 }
951
952 this.pending_room_update.take();
953 if this.should_leave() {
954 log::info!("room is empty, leaving");
955 this.leave(cx).detach();
956 }
957
958 this.user_store.update(cx, |user_store, cx| {
959 let participant_indices_by_user_id = this
960 .remote_participants
961 .iter()
962 .map(|(user_id, participant)| (*user_id, participant.participant_index))
963 .collect();
964 user_store.set_participant_indices(participant_indices_by_user_id, cx);
965 });
966
967 this.check_invariants();
968 this.room_update_completed_tx.try_send(Some(())).ok();
969 cx.notify();
970 })
971 .ok();
972 }));
973
974 cx.notify();
975 Ok(())
976 }
977
978 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
979 let mut done_rx = self.room_update_completed_rx.clone();
980 async move {
981 while let Some(result) = done_rx.next().await {
982 if result.is_some() {
983 break;
984 }
985 }
986 }
987 }
988
989 fn live_kit_room_updated(&mut self, update: RoomUpdate, cx: &mut Context<Self>) -> Result<()> {
990 match update {
991 RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
992 let user_id = track.publisher_id().parse()?;
993 let track_id = track.sid().to_string();
994 let participant = self
995 .remote_participants
996 .get_mut(&user_id)
997 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
998 participant.video_tracks.insert(track_id.clone(), track);
999 cx.emit(Event::RemoteVideoTracksChanged {
1000 participant_id: participant.peer_id,
1001 });
1002 }
1003
1004 RoomUpdate::UnsubscribedFromRemoteVideoTrack {
1005 publisher_id,
1006 track_id,
1007 } => {
1008 let user_id = publisher_id.parse()?;
1009 let participant = self
1010 .remote_participants
1011 .get_mut(&user_id)
1012 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1013 participant.video_tracks.remove(&track_id);
1014 cx.emit(Event::RemoteVideoTracksChanged {
1015 participant_id: participant.peer_id,
1016 });
1017 }
1018
1019 RoomUpdate::ActiveSpeakersChanged { speakers } => {
1020 let mut speaker_ids = speakers
1021 .into_iter()
1022 .filter_map(|speaker_sid| speaker_sid.parse().ok())
1023 .collect::<Vec<u64>>();
1024 speaker_ids.sort_unstable();
1025 for (sid, participant) in &mut self.remote_participants {
1026 participant.speaking = speaker_ids.binary_search(sid).is_ok();
1027 }
1028 if let Some(id) = self.client.user_id() {
1029 if let Some(room) = &mut self.live_kit {
1030 room.speaking = speaker_ids.binary_search(&id).is_ok();
1031 }
1032 }
1033 }
1034
1035 RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
1036 let mut found = false;
1037 for participant in &mut self.remote_participants.values_mut() {
1038 for track in participant.audio_tracks.values() {
1039 if track.sid() == track_id {
1040 found = true;
1041 break;
1042 }
1043 }
1044 if found {
1045 participant.muted = muted;
1046 break;
1047 }
1048 }
1049 }
1050
1051 RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
1052 if let Some(live_kit) = &self.live_kit {
1053 if live_kit.deafened {
1054 track.stop();
1055 cx.foreground_executor()
1056 .spawn(publication.set_enabled(false))
1057 .detach();
1058 }
1059 }
1060
1061 let user_id = track.publisher_id().parse()?;
1062 let track_id = track.sid().to_string();
1063 let participant = self
1064 .remote_participants
1065 .get_mut(&user_id)
1066 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1067 participant.audio_tracks.insert(track_id.clone(), track);
1068 participant.muted = publication.is_muted();
1069
1070 cx.emit(Event::RemoteAudioTracksChanged {
1071 participant_id: participant.peer_id,
1072 });
1073 }
1074
1075 RoomUpdate::UnsubscribedFromRemoteAudioTrack {
1076 publisher_id,
1077 track_id,
1078 } => {
1079 let user_id = publisher_id.parse()?;
1080 let participant = self
1081 .remote_participants
1082 .get_mut(&user_id)
1083 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1084 participant.audio_tracks.remove(&track_id);
1085 cx.emit(Event::RemoteAudioTracksChanged {
1086 participant_id: participant.peer_id,
1087 });
1088 }
1089
1090 RoomUpdate::LocalAudioTrackUnpublished { publication } => {
1091 log::info!("unpublished audio track {}", publication.sid());
1092 if let Some(room) = &mut self.live_kit {
1093 room.microphone_track = LocalTrack::None;
1094 }
1095 }
1096
1097 RoomUpdate::LocalVideoTrackUnpublished { publication } => {
1098 log::info!("unpublished video track {}", publication.sid());
1099 if let Some(room) = &mut self.live_kit {
1100 room.screen_track = LocalTrack::None;
1101 }
1102 }
1103
1104 RoomUpdate::LocalAudioTrackPublished { publication } => {
1105 log::info!("published audio track {}", publication.sid());
1106 }
1107
1108 RoomUpdate::LocalVideoTrackPublished { publication } => {
1109 log::info!("published video track {}", publication.sid());
1110 }
1111 }
1112
1113 cx.notify();
1114 Ok(())
1115 }
1116
1117 fn check_invariants(&self) {
1118 #[cfg(any(test, feature = "test-support"))]
1119 {
1120 for participant in self.remote_participants.values() {
1121 assert!(self.participant_user_ids.contains(&participant.user.id));
1122 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1123 }
1124
1125 for participant in &self.pending_participants {
1126 assert!(self.participant_user_ids.contains(&participant.id));
1127 assert_ne!(participant.id, self.client.user_id().unwrap());
1128 }
1129
1130 assert_eq!(
1131 self.participant_user_ids.len(),
1132 self.remote_participants.len() + self.pending_participants.len()
1133 );
1134 }
1135 }
1136
1137 pub(crate) fn call(
1138 &mut self,
1139 called_user_id: u64,
1140 initial_project_id: Option<u64>,
1141 cx: &mut Context<Self>,
1142 ) -> Task<Result<()>> {
1143 if self.status.is_offline() {
1144 return Task::ready(Err(anyhow!("room is offline")));
1145 }
1146
1147 cx.notify();
1148 let client = self.client.clone();
1149 let room_id = self.id;
1150 self.pending_call_count += 1;
1151 cx.spawn(async move |this, cx| {
1152 let result = client
1153 .request(proto::Call {
1154 room_id,
1155 called_user_id,
1156 initial_project_id,
1157 })
1158 .await;
1159 this.update(cx, |this, cx| {
1160 this.pending_call_count -= 1;
1161 if this.should_leave() {
1162 this.leave(cx).detach_and_log_err(cx);
1163 }
1164 })?;
1165 result?;
1166 Ok(())
1167 })
1168 }
1169
1170 pub fn join_project(
1171 &mut self,
1172 id: u64,
1173 language_registry: Arc<LanguageRegistry>,
1174 fs: Arc<dyn Fs>,
1175 cx: &mut Context<Self>,
1176 ) -> Task<Result<Entity<Project>>> {
1177 let client = self.client.clone();
1178 let user_store = self.user_store.clone();
1179 cx.emit(Event::RemoteProjectJoined { project_id: id });
1180 cx.spawn(async move |this, cx| {
1181 let project =
1182 Project::in_room(id, client, user_store, language_registry, fs, cx.clone()).await?;
1183
1184 this.update(cx, |this, cx| {
1185 this.joined_projects.retain(|project| {
1186 if let Some(project) = project.upgrade() {
1187 !project.read(cx).is_disconnected(cx)
1188 } else {
1189 false
1190 }
1191 });
1192 this.joined_projects.insert(project.downgrade());
1193 })?;
1194 Ok(project)
1195 })
1196 }
1197
1198 pub fn share_project(
1199 &mut self,
1200 project: Entity<Project>,
1201 cx: &mut Context<Self>,
1202 ) -> Task<Result<u64>> {
1203 if let Some(project_id) = project.read(cx).remote_id() {
1204 return Task::ready(Ok(project_id));
1205 }
1206
1207 let request = self.client.request(proto::ShareProject {
1208 room_id: self.id(),
1209 worktrees: project.read(cx).worktree_metadata_protos(cx),
1210 is_ssh_project: project.read(cx).is_via_ssh(),
1211 });
1212
1213 cx.spawn(async move |this, cx| {
1214 let response = request.await?;
1215
1216 project.update(cx, |project, cx| project.shared(response.project_id, cx))??;
1217
1218 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1219 this.update(cx, |this, cx| {
1220 this.shared_projects.insert(project.downgrade());
1221 let active_project = this.local_participant.active_project.as_ref();
1222 if active_project.map_or(false, |location| *location == project) {
1223 this.set_location(Some(&project), cx)
1224 } else {
1225 Task::ready(Ok(()))
1226 }
1227 })?
1228 .await?;
1229
1230 Ok(response.project_id)
1231 })
1232 }
1233
1234 pub(crate) fn unshare_project(
1235 &mut self,
1236 project: Entity<Project>,
1237 cx: &mut Context<Self>,
1238 ) -> Result<()> {
1239 let project_id = match project.read(cx).remote_id() {
1240 Some(project_id) => project_id,
1241 None => return Ok(()),
1242 };
1243
1244 self.client.send(proto::UnshareProject { project_id })?;
1245 project.update(cx, |this, cx| this.unshare(cx))?;
1246
1247 if self.local_participant.active_project == Some(project.downgrade()) {
1248 self.set_location(Some(&project), cx).detach_and_log_err(cx);
1249 }
1250 Ok(())
1251 }
1252
1253 pub(crate) fn set_location(
1254 &mut self,
1255 project: Option<&Entity<Project>>,
1256 cx: &mut Context<Self>,
1257 ) -> Task<Result<()>> {
1258 if self.status.is_offline() {
1259 return Task::ready(Err(anyhow!("room is offline")));
1260 }
1261
1262 let client = self.client.clone();
1263 let room_id = self.id;
1264 let location = if let Some(project) = project {
1265 self.local_participant.active_project = Some(project.downgrade());
1266 if let Some(project_id) = project.read(cx).remote_id() {
1267 proto::participant_location::Variant::SharedProject(
1268 proto::participant_location::SharedProject { id: project_id },
1269 )
1270 } else {
1271 proto::participant_location::Variant::UnsharedProject(
1272 proto::participant_location::UnsharedProject {},
1273 )
1274 }
1275 } else {
1276 self.local_participant.active_project = None;
1277 proto::participant_location::Variant::External(proto::participant_location::External {})
1278 };
1279
1280 cx.notify();
1281 cx.background_spawn(async move {
1282 client
1283 .request(proto::UpdateParticipantLocation {
1284 room_id,
1285 location: Some(proto::ParticipantLocation {
1286 variant: Some(location),
1287 }),
1288 })
1289 .await?;
1290 Ok(())
1291 })
1292 }
1293
1294 pub fn is_screen_sharing(&self) -> bool {
1295 self.live_kit.as_ref().map_or(false, |live_kit| {
1296 !matches!(live_kit.screen_track, LocalTrack::None)
1297 })
1298 }
1299
1300 pub fn is_sharing_mic(&self) -> bool {
1301 self.live_kit.as_ref().map_or(false, |live_kit| {
1302 !matches!(live_kit.microphone_track, LocalTrack::None)
1303 })
1304 }
1305
1306 pub fn is_muted(&self) -> bool {
1307 self.live_kit.as_ref().map_or(false, |live_kit| {
1308 matches!(live_kit.microphone_track, LocalTrack::None)
1309 || live_kit.muted_by_user
1310 || live_kit.deafened
1311 })
1312 }
1313
1314 pub fn muted_by_user(&self) -> bool {
1315 self.live_kit
1316 .as_ref()
1317 .map_or(false, |live_kit| live_kit.muted_by_user)
1318 }
1319
1320 pub fn is_speaking(&self) -> bool {
1321 self.live_kit
1322 .as_ref()
1323 .map_or(false, |live_kit| live_kit.speaking)
1324 }
1325
1326 pub fn is_deafened(&self) -> Option<bool> {
1327 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1328 }
1329
1330 pub fn can_use_microphone(&self) -> bool {
1331 use proto::ChannelRole::*;
1332 match self.local_participant.role {
1333 Admin | Member | Talker => true,
1334 Guest | Banned => false,
1335 }
1336 }
1337
1338 pub fn can_share_projects(&self) -> bool {
1339 use proto::ChannelRole::*;
1340 match self.local_participant.role {
1341 Admin | Member => true,
1342 Guest | Banned | Talker => false,
1343 }
1344 }
1345
1346 #[track_caller]
1347 pub fn share_microphone(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1348 if self.status.is_offline() {
1349 return Task::ready(Err(anyhow!("room is offline")));
1350 }
1351
1352 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1353 let publish_id = post_inc(&mut live_kit.next_publish_id);
1354 live_kit.microphone_track = LocalTrack::Pending { publish_id };
1355 cx.notify();
1356 publish_id
1357 } else {
1358 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1359 };
1360
1361 cx.spawn(async move |this, cx| {
1362 let publish_track = async {
1363 let track = LocalAudioTrack::create();
1364 this.upgrade()
1365 .ok_or_else(|| anyhow!("room was dropped"))?
1366 .update(cx, |this, _| {
1367 this.live_kit
1368 .as_ref()
1369 .map(|live_kit| live_kit.room.publish_audio_track(track))
1370 })?
1371 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1372 .await
1373 };
1374 let publication = publish_track.await;
1375 this.upgrade()
1376 .ok_or_else(|| anyhow!("room was dropped"))?
1377 .update(cx, |this, cx| {
1378 let live_kit = this
1379 .live_kit
1380 .as_mut()
1381 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1382
1383 let canceled = if let LocalTrack::Pending {
1384 publish_id: cur_publish_id,
1385 } = &live_kit.microphone_track
1386 {
1387 *cur_publish_id != publish_id
1388 } else {
1389 true
1390 };
1391
1392 match publication {
1393 Ok(publication) => {
1394 if canceled {
1395 live_kit.room.unpublish_track(publication);
1396 } else {
1397 if live_kit.muted_by_user || live_kit.deafened {
1398 cx.background_spawn(publication.set_mute(true)).detach();
1399 }
1400 live_kit.microphone_track = LocalTrack::Published {
1401 track_publication: publication,
1402 };
1403 cx.notify();
1404 }
1405 Ok(())
1406 }
1407 Err(error) => {
1408 if canceled {
1409 Ok(())
1410 } else {
1411 live_kit.microphone_track = LocalTrack::None;
1412 cx.notify();
1413 Err(error)
1414 }
1415 }
1416 }
1417 })?
1418 })
1419 }
1420
1421 pub fn share_screen(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1422 if self.status.is_offline() {
1423 return Task::ready(Err(anyhow!("room is offline")));
1424 } else if self.is_screen_sharing() {
1425 return Task::ready(Err(anyhow!("screen was already shared")));
1426 }
1427
1428 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1429 let publish_id = post_inc(&mut live_kit.next_publish_id);
1430 live_kit.screen_track = LocalTrack::Pending { publish_id };
1431 cx.notify();
1432 (live_kit.room.display_sources(), publish_id)
1433 } else {
1434 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1435 };
1436
1437 cx.spawn(async move |this, cx| {
1438 let publish_track = async {
1439 let displays = displays.await?;
1440 let display = displays
1441 .first()
1442 .ok_or_else(|| anyhow!("no display found"))?;
1443 let track = LocalVideoTrack::screen_share_for_display(display);
1444 this.upgrade()
1445 .ok_or_else(|| anyhow!("room was dropped"))?
1446 .update(cx, |this, _| {
1447 this.live_kit
1448 .as_ref()
1449 .map(|live_kit| live_kit.room.publish_video_track(track))
1450 })?
1451 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1452 .await
1453 };
1454
1455 let publication = publish_track.await;
1456 this.upgrade()
1457 .ok_or_else(|| anyhow!("room was dropped"))?
1458 .update(cx, |this, cx| {
1459 let live_kit = this
1460 .live_kit
1461 .as_mut()
1462 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1463
1464 let canceled = if let LocalTrack::Pending {
1465 publish_id: cur_publish_id,
1466 } = &live_kit.screen_track
1467 {
1468 *cur_publish_id != publish_id
1469 } else {
1470 true
1471 };
1472
1473 match publication {
1474 Ok(publication) => {
1475 if canceled {
1476 live_kit.room.unpublish_track(publication);
1477 } else {
1478 live_kit.screen_track = LocalTrack::Published {
1479 track_publication: publication,
1480 };
1481 cx.notify();
1482 }
1483
1484 Audio::play_sound(Sound::StartScreenshare, cx);
1485
1486 Ok(())
1487 }
1488 Err(error) => {
1489 if canceled {
1490 Ok(())
1491 } else {
1492 live_kit.screen_track = LocalTrack::None;
1493 cx.notify();
1494 Err(error)
1495 }
1496 }
1497 }
1498 })?
1499 })
1500 }
1501
1502 pub fn toggle_mute(&mut self, cx: &mut Context<Self>) {
1503 if let Some(live_kit) = self.live_kit.as_mut() {
1504 // When unmuting, undeafen if the user was deafened before.
1505 let was_deafened = live_kit.deafened;
1506 if live_kit.muted_by_user
1507 || live_kit.deafened
1508 || matches!(live_kit.microphone_track, LocalTrack::None)
1509 {
1510 live_kit.muted_by_user = false;
1511 live_kit.deafened = false;
1512 } else {
1513 live_kit.muted_by_user = true;
1514 }
1515 let muted = live_kit.muted_by_user;
1516 let should_undeafen = was_deafened && !live_kit.deafened;
1517
1518 if let Some(task) = self.set_mute(muted, cx) {
1519 task.detach_and_log_err(cx);
1520 }
1521
1522 if should_undeafen {
1523 if let Some(task) = self.set_deafened(false, cx) {
1524 task.detach_and_log_err(cx);
1525 }
1526 }
1527 }
1528 }
1529
1530 pub fn toggle_deafen(&mut self, cx: &mut Context<Self>) {
1531 if let Some(live_kit) = self.live_kit.as_mut() {
1532 // When deafening, mute the microphone if it was not already muted.
1533 // When un-deafening, unmute the microphone, unless it was explicitly muted.
1534 let deafened = !live_kit.deafened;
1535 live_kit.deafened = deafened;
1536 let should_change_mute = !live_kit.muted_by_user;
1537
1538 if let Some(task) = self.set_deafened(deafened, cx) {
1539 task.detach_and_log_err(cx);
1540 }
1541
1542 if should_change_mute {
1543 if let Some(task) = self.set_mute(deafened, cx) {
1544 task.detach_and_log_err(cx);
1545 }
1546 }
1547 }
1548 }
1549
1550 pub fn unshare_screen(&mut self, cx: &mut Context<Self>) -> Result<()> {
1551 if self.status.is_offline() {
1552 return Err(anyhow!("room is offline"));
1553 }
1554
1555 let live_kit = self
1556 .live_kit
1557 .as_mut()
1558 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1559 match mem::take(&mut live_kit.screen_track) {
1560 LocalTrack::None => Err(anyhow!("screen was not shared")),
1561 LocalTrack::Pending { .. } => {
1562 cx.notify();
1563 Ok(())
1564 }
1565 LocalTrack::Published {
1566 track_publication, ..
1567 } => {
1568 live_kit.room.unpublish_track(track_publication);
1569 cx.notify();
1570
1571 Audio::play_sound(Sound::StopScreenshare, cx);
1572 Ok(())
1573 }
1574 }
1575 }
1576
1577 fn set_deafened(&mut self, deafened: bool, cx: &mut Context<Self>) -> Option<Task<Result<()>>> {
1578 let live_kit = self.live_kit.as_mut()?;
1579 cx.notify();
1580
1581 let mut track_updates = Vec::new();
1582 for participant in self.remote_participants.values() {
1583 for publication in live_kit
1584 .room
1585 .remote_audio_track_publications(&participant.user.id.to_string())
1586 {
1587 track_updates.push(publication.set_enabled(!deafened));
1588 }
1589
1590 for track in participant.audio_tracks.values() {
1591 if deafened {
1592 track.stop();
1593 } else {
1594 track.start();
1595 }
1596 }
1597 }
1598
1599 Some(cx.foreground_executor().spawn(async move {
1600 for result in futures::future::join_all(track_updates).await {
1601 result?;
1602 }
1603 Ok(())
1604 }))
1605 }
1606
1607 fn set_mute(&mut self, should_mute: bool, cx: &mut Context<Room>) -> Option<Task<Result<()>>> {
1608 let live_kit = self.live_kit.as_mut()?;
1609 cx.notify();
1610
1611 if should_mute {
1612 Audio::play_sound(Sound::Mute, cx);
1613 } else {
1614 Audio::play_sound(Sound::Unmute, cx);
1615 }
1616
1617 match &mut live_kit.microphone_track {
1618 LocalTrack::None => {
1619 if should_mute {
1620 None
1621 } else {
1622 Some(self.share_microphone(cx))
1623 }
1624 }
1625 LocalTrack::Pending { .. } => None,
1626 LocalTrack::Published { track_publication } => Some(
1627 cx.foreground_executor()
1628 .spawn(track_publication.set_mute(should_mute)),
1629 ),
1630 }
1631 }
1632
1633 #[cfg(any(test, feature = "test-support"))]
1634 pub fn set_display_sources(&self, sources: Vec<livekit_client_macos::MacOSDisplay>) {
1635 self.live_kit
1636 .as_ref()
1637 .unwrap()
1638 .room
1639 .set_display_sources(sources);
1640 }
1641}
1642
1643struct LiveKitRoom {
1644 room: Arc<livekit_client_macos::Room>,
1645 screen_track: LocalTrack,
1646 microphone_track: LocalTrack,
1647 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1648 muted_by_user: bool,
1649 deafened: bool,
1650 speaking: bool,
1651 next_publish_id: usize,
1652 _maintain_room: Task<()>,
1653 _handle_updates: Task<()>,
1654}
1655
1656impl LiveKitRoom {
1657 fn stop_publishing(&mut self, cx: &mut Context<Room>) {
1658 if let LocalTrack::Published {
1659 track_publication, ..
1660 } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1661 {
1662 self.room.unpublish_track(track_publication);
1663 cx.notify();
1664 }
1665
1666 if let LocalTrack::Published {
1667 track_publication, ..
1668 } = mem::replace(&mut self.screen_track, LocalTrack::None)
1669 {
1670 self.room.unpublish_track(track_publication);
1671 cx.notify();
1672 }
1673 }
1674}
1675
1676enum LocalTrack {
1677 None,
1678 Pending {
1679 publish_id: usize,
1680 },
1681 Published {
1682 track_publication: LocalTrackPublication,
1683 },
1684}
1685
1686impl Default for LocalTrack {
1687 fn default() -> Self {
1688 Self::None
1689 }
1690}
1691
1692#[derive(Copy, Clone, PartialEq, Eq)]
1693pub enum RoomStatus {
1694 Online,
1695 Rejoining,
1696 Offline,
1697}
1698
1699impl RoomStatus {
1700 pub fn is_offline(&self) -> bool {
1701 matches!(self, RoomStatus::Offline)
1702 }
1703
1704 pub fn is_online(&self) -> bool {
1705 matches!(self, RoomStatus::Online)
1706 }
1707}