1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
4};
5use anyhow::{anyhow, Result};
6use audio::{Audio, Sound};
7use client::{
8 proto::{self, PeerId},
9 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use fs::Fs;
13use futures::{FutureExt, StreamExt};
14use gpui::{
15 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
16};
17use language::LanguageRegistry;
18use live_kit_client::{
19 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
20 RemoteVideoTrackUpdate,
21};
22use postage::{sink::Sink, stream::Stream, watch};
23use project::Project;
24use settings::Settings as _;
25use std::{future::Future, mem, sync::Arc, time::Duration};
26use util::{post_inc, ResultExt, TryFutureExt};
27
28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
29
30#[derive(Clone, Debug, PartialEq, Eq)]
31pub enum Event {
32 ParticipantLocationChanged {
33 participant_id: proto::PeerId,
34 },
35 RemoteVideoTracksChanged {
36 participant_id: proto::PeerId,
37 },
38 RemoteAudioTracksChanged {
39 participant_id: proto::PeerId,
40 },
41 RemoteProjectShared {
42 owner: Arc<User>,
43 project_id: u64,
44 worktree_root_names: Vec<String>,
45 },
46 RemoteProjectUnshared {
47 project_id: u64,
48 },
49 RemoteProjectJoined {
50 project_id: u64,
51 },
52 RemoteProjectInvitationDiscarded {
53 project_id: u64,
54 },
55 Left,
56}
57
58pub struct Room {
59 id: u64,
60 channel_id: Option<u64>,
61 live_kit: Option<LiveKitRoom>,
62 status: RoomStatus,
63 shared_projects: HashSet<WeakModel<Project>>,
64 joined_projects: HashSet<WeakModel<Project>>,
65 local_participant: LocalParticipant,
66 remote_participants: BTreeMap<u64, RemoteParticipant>,
67 pending_participants: Vec<Arc<User>>,
68 participant_user_ids: HashSet<u64>,
69 pending_call_count: usize,
70 leave_when_empty: bool,
71 client: Arc<Client>,
72 user_store: Model<UserStore>,
73 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
74 client_subscriptions: Vec<client::Subscription>,
75 _subscriptions: Vec<gpui::Subscription>,
76 room_update_completed_tx: watch::Sender<Option<()>>,
77 room_update_completed_rx: watch::Receiver<Option<()>>,
78 pending_room_update: Option<Task<()>>,
79 maintain_connection: Option<Task<Option<()>>>,
80}
81
82impl EventEmitter<Event> for Room {}
83
84impl Room {
85 pub fn channel_id(&self) -> Option<u64> {
86 self.channel_id
87 }
88
89 pub fn is_sharing_project(&self) -> bool {
90 !self.shared_projects.is_empty()
91 }
92
93 #[cfg(any(test, feature = "test-support"))]
94 pub fn is_connected(&self) -> bool {
95 if let Some(live_kit) = self.live_kit.as_ref() {
96 matches!(
97 *live_kit.room.status().borrow(),
98 live_kit_client::ConnectionState::Connected { .. }
99 )
100 } else {
101 false
102 }
103 }
104
105 fn new(
106 id: u64,
107 channel_id: Option<u64>,
108 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
109 client: Arc<Client>,
110 user_store: Model<UserStore>,
111 cx: &mut ModelContext<Self>,
112 ) -> Self {
113 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
114 let room = live_kit_client::Room::new();
115 let mut status = room.status();
116 // Consume the initial status of the room.
117 let _ = status.try_recv();
118 let _maintain_room = cx.spawn(|this, mut cx| async move {
119 while let Some(status) = status.next().await {
120 let this = if let Some(this) = this.upgrade() {
121 this
122 } else {
123 break;
124 };
125
126 if status == live_kit_client::ConnectionState::Disconnected {
127 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
128 .ok();
129 break;
130 }
131 }
132 });
133
134 let _maintain_video_tracks = cx.spawn({
135 let room = room.clone();
136 move |this, mut cx| async move {
137 let mut track_video_changes = room.remote_video_track_updates();
138 while let Some(track_change) = track_video_changes.next().await {
139 let this = if let Some(this) = this.upgrade() {
140 this
141 } else {
142 break;
143 };
144
145 this.update(&mut cx, |this, cx| {
146 this.remote_video_track_updated(track_change, cx).log_err()
147 })
148 .ok();
149 }
150 }
151 });
152
153 let _maintain_audio_tracks = cx.spawn({
154 let room = room.clone();
155 |this, mut cx| async move {
156 let mut track_audio_changes = room.remote_audio_track_updates();
157 while let Some(track_change) = track_audio_changes.next().await {
158 let this = if let Some(this) = this.upgrade() {
159 this
160 } else {
161 break;
162 };
163
164 this.update(&mut cx, |this, cx| {
165 this.remote_audio_track_updated(track_change, cx).log_err()
166 })
167 .ok();
168 }
169 }
170 });
171
172 let connect = room.connect(&connection_info.server_url, &connection_info.token);
173 cx.spawn(|this, mut cx| async move {
174 connect.await?;
175
176 let is_read_only = this
177 .update(&mut cx, |room, _| room.read_only())
178 .unwrap_or(true);
179
180 if !cx.update(|cx| Self::mute_on_join(cx))? && !is_read_only {
181 this.update(&mut cx, |this, cx| this.share_microphone(cx))?
182 .await?;
183 }
184
185 anyhow::Ok(())
186 })
187 .detach_and_log_err(cx);
188
189 Some(LiveKitRoom {
190 room,
191 screen_track: LocalTrack::None,
192 microphone_track: LocalTrack::None,
193 next_publish_id: 0,
194 muted_by_user: false,
195 deafened: false,
196 speaking: false,
197 _maintain_room,
198 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
199 })
200 } else {
201 None
202 };
203
204 let maintain_connection = cx.spawn({
205 let client = client.clone();
206 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
207 });
208
209 Audio::play_sound(Sound::Joined, cx);
210
211 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
212
213 Self {
214 id,
215 channel_id,
216 live_kit: live_kit_room,
217 status: RoomStatus::Online,
218 shared_projects: Default::default(),
219 joined_projects: Default::default(),
220 participant_user_ids: Default::default(),
221 local_participant: Default::default(),
222 remote_participants: Default::default(),
223 pending_participants: Default::default(),
224 pending_call_count: 0,
225 client_subscriptions: vec![
226 client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
227 ],
228 _subscriptions: vec![
229 cx.on_release(Self::released),
230 cx.on_app_quit(Self::app_will_quit),
231 ],
232 leave_when_empty: false,
233 pending_room_update: None,
234 client,
235 user_store,
236 follows_by_leader_id_project_id: Default::default(),
237 maintain_connection: Some(maintain_connection),
238 room_update_completed_tx,
239 room_update_completed_rx,
240 }
241 }
242
243 pub(crate) fn create(
244 called_user_id: u64,
245 initial_project: Option<Model<Project>>,
246 client: Arc<Client>,
247 user_store: Model<UserStore>,
248 cx: &mut AppContext,
249 ) -> Task<Result<Model<Self>>> {
250 cx.spawn(move |mut cx| async move {
251 let response = client.request(proto::CreateRoom {}).await?;
252 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
253 let room = cx.new_model(|cx| {
254 let mut room = Self::new(
255 room_proto.id,
256 None,
257 response.live_kit_connection_info,
258 client,
259 user_store,
260 cx,
261 );
262 if let Some(participant) = room_proto.participants.first() {
263 room.local_participant.role = participant.role()
264 }
265 room
266 })?;
267
268 let initial_project_id = if let Some(initial_project) = initial_project {
269 let initial_project_id = room
270 .update(&mut cx, |room, cx| {
271 room.share_project(initial_project.clone(), cx)
272 })?
273 .await?;
274 Some(initial_project_id)
275 } else {
276 None
277 };
278
279 match room
280 .update(&mut cx, |room, cx| {
281 room.leave_when_empty = true;
282 room.call(called_user_id, initial_project_id, cx)
283 })?
284 .await
285 {
286 Ok(()) => Ok(room),
287 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
288 }
289 })
290 }
291
292 pub(crate) async fn join_channel(
293 channel_id: u64,
294 client: Arc<Client>,
295 user_store: Model<UserStore>,
296 cx: AsyncAppContext,
297 ) -> Result<Model<Self>> {
298 Self::from_join_response(
299 client.request(proto::JoinChannel { channel_id }).await?,
300 client,
301 user_store,
302 cx,
303 )
304 }
305
306 pub(crate) async fn join(
307 room_id: u64,
308 client: Arc<Client>,
309 user_store: Model<UserStore>,
310 cx: AsyncAppContext,
311 ) -> Result<Model<Self>> {
312 Self::from_join_response(
313 client.request(proto::JoinRoom { id: room_id }).await?,
314 client,
315 user_store,
316 cx,
317 )
318 }
319
320 fn released(&mut self, cx: &mut AppContext) {
321 if self.status.is_online() {
322 self.leave_internal(cx).detach_and_log_err(cx);
323 }
324 }
325
326 fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
327 let task = if self.status.is_online() {
328 let leave = self.leave_internal(cx);
329 Some(cx.background_executor().spawn(async move {
330 leave.await.log_err();
331 }))
332 } else {
333 None
334 };
335
336 async move {
337 if let Some(task) = task {
338 task.await;
339 }
340 }
341 }
342
343 pub fn mute_on_join(cx: &AppContext) -> bool {
344 CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
345 }
346
347 fn from_join_response(
348 response: proto::JoinRoomResponse,
349 client: Arc<Client>,
350 user_store: Model<UserStore>,
351 mut cx: AsyncAppContext,
352 ) -> Result<Model<Self>> {
353 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
354 let room = cx.new_model(|cx| {
355 Self::new(
356 room_proto.id,
357 response.channel_id,
358 response.live_kit_connection_info,
359 client,
360 user_store,
361 cx,
362 )
363 })?;
364 room.update(&mut cx, |room, cx| {
365 room.leave_when_empty = room.channel_id.is_none();
366 room.apply_room_update(room_proto, cx)?;
367 anyhow::Ok(())
368 })??;
369 Ok(room)
370 }
371
372 fn should_leave(&self) -> bool {
373 self.leave_when_empty
374 && self.pending_room_update.is_none()
375 && self.pending_participants.is_empty()
376 && self.remote_participants.is_empty()
377 && self.pending_call_count == 0
378 }
379
380 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
381 cx.notify();
382 cx.emit(Event::Left);
383 self.leave_internal(cx)
384 }
385
386 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
387 if self.status.is_offline() {
388 return Task::ready(Err(anyhow!("room is offline")));
389 }
390
391 log::info!("leaving room");
392 Audio::play_sound(Sound::Leave, cx);
393
394 self.clear_state(cx);
395
396 let leave_room = self.client.request(proto::LeaveRoom {});
397 cx.background_executor().spawn(async move {
398 leave_room.await?;
399 anyhow::Ok(())
400 })
401 }
402
403 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
404 for project in self.shared_projects.drain() {
405 if let Some(project) = project.upgrade() {
406 project.update(cx, |project, cx| {
407 project.unshare(cx).log_err();
408 });
409 }
410 }
411 for project in self.joined_projects.drain() {
412 if let Some(project) = project.upgrade() {
413 project.update(cx, |project, cx| {
414 project.disconnected_from_host(cx);
415 project.close(cx);
416 });
417 }
418 }
419
420 self.status = RoomStatus::Offline;
421 self.remote_participants.clear();
422 self.pending_participants.clear();
423 self.participant_user_ids.clear();
424 self.client_subscriptions.clear();
425 self.live_kit.take();
426 self.pending_room_update.take();
427 self.maintain_connection.take();
428 }
429
430 async fn maintain_connection(
431 this: WeakModel<Self>,
432 client: Arc<Client>,
433 mut cx: AsyncAppContext,
434 ) -> Result<()> {
435 let mut client_status = client.status();
436 loop {
437 let _ = client_status.try_recv();
438 let is_connected = client_status.borrow().is_connected();
439 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
440 if !is_connected || client_status.next().await.is_some() {
441 log::info!("detected client disconnection");
442
443 this.upgrade()
444 .ok_or_else(|| anyhow!("room was dropped"))?
445 .update(&mut cx, |this, cx| {
446 this.status = RoomStatus::Rejoining;
447 cx.notify();
448 })?;
449
450 // Wait for client to re-establish a connection to the server.
451 {
452 let mut reconnection_timeout =
453 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
454 let client_reconnection = async {
455 let mut remaining_attempts = 3;
456 while remaining_attempts > 0 {
457 if client_status.borrow().is_connected() {
458 log::info!("client reconnected, attempting to rejoin room");
459
460 let Some(this) = this.upgrade() else { break };
461 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
462 Ok(task) => {
463 if task.await.log_err().is_some() {
464 return true;
465 } else {
466 remaining_attempts -= 1;
467 }
468 }
469 Err(_app_dropped) => return false,
470 }
471 } else if client_status.borrow().is_signed_out() {
472 return false;
473 }
474
475 log::info!(
476 "waiting for client status change, remaining attempts {}",
477 remaining_attempts
478 );
479 client_status.next().await;
480 }
481 false
482 }
483 .fuse();
484 futures::pin_mut!(client_reconnection);
485
486 futures::select_biased! {
487 reconnected = client_reconnection => {
488 if reconnected {
489 log::info!("successfully reconnected to room");
490 // If we successfully joined the room, go back around the loop
491 // waiting for future connection status changes.
492 continue;
493 }
494 }
495 _ = reconnection_timeout => {
496 log::info!("room reconnection timeout expired");
497 }
498 }
499 }
500
501 break;
502 }
503 }
504
505 // The client failed to re-establish a connection to the server
506 // or an error occurred while trying to re-join the room. Either way
507 // we leave the room and return an error.
508 if let Some(this) = this.upgrade() {
509 log::info!("reconnection failed, leaving room");
510 let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
511 }
512 Err(anyhow!(
513 "can't reconnect to room: client failed to re-establish connection"
514 ))
515 }
516
517 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
518 let mut projects = HashMap::default();
519 let mut reshared_projects = Vec::new();
520 let mut rejoined_projects = Vec::new();
521 self.shared_projects.retain(|project| {
522 if let Some(handle) = project.upgrade() {
523 let project = handle.read(cx);
524 if let Some(project_id) = project.remote_id() {
525 projects.insert(project_id, handle.clone());
526 reshared_projects.push(proto::UpdateProject {
527 project_id,
528 worktrees: project.worktree_metadata_protos(cx),
529 });
530 return true;
531 }
532 }
533 false
534 });
535 self.joined_projects.retain(|project| {
536 if let Some(handle) = project.upgrade() {
537 let project = handle.read(cx);
538 if let Some(project_id) = project.remote_id() {
539 projects.insert(project_id, handle.clone());
540 rejoined_projects.push(proto::RejoinProject {
541 id: project_id,
542 worktrees: project
543 .worktrees()
544 .map(|worktree| {
545 let worktree = worktree.read(cx);
546 proto::RejoinWorktree {
547 id: worktree.id().to_proto(),
548 scan_id: worktree.completed_scan_id() as u64,
549 }
550 })
551 .collect(),
552 });
553 }
554 return true;
555 }
556 false
557 });
558
559 let response = self.client.request_envelope(proto::RejoinRoom {
560 id: self.id,
561 reshared_projects,
562 rejoined_projects,
563 });
564
565 cx.spawn(|this, mut cx| async move {
566 let response = response.await?;
567 let message_id = response.message_id;
568 let response = response.payload;
569 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
570 this.update(&mut cx, |this, cx| {
571 this.status = RoomStatus::Online;
572 this.apply_room_update(room_proto, cx)?;
573
574 for reshared_project in response.reshared_projects {
575 if let Some(project) = projects.get(&reshared_project.id) {
576 project.update(cx, |project, cx| {
577 project.reshared(reshared_project, cx).log_err();
578 });
579 }
580 }
581
582 for rejoined_project in response.rejoined_projects {
583 if let Some(project) = projects.get(&rejoined_project.id) {
584 project.update(cx, |project, cx| {
585 project.rejoined(rejoined_project, message_id, cx).log_err();
586 });
587 }
588 }
589
590 anyhow::Ok(())
591 })?
592 })
593 }
594
595 pub fn id(&self) -> u64 {
596 self.id
597 }
598
599 pub fn status(&self) -> RoomStatus {
600 self.status
601 }
602
603 pub fn local_participant(&self) -> &LocalParticipant {
604 &self.local_participant
605 }
606
607 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
608 &self.remote_participants
609 }
610
611 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
612 self.remote_participants
613 .values()
614 .find(|p| p.peer_id == peer_id)
615 }
616
617 pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
618 self.remote_participants
619 .get(&user_id)
620 .map(|participant| participant.role)
621 }
622
623 pub fn local_participant_is_admin(&self) -> bool {
624 self.local_participant.role == proto::ChannelRole::Admin
625 }
626
627 pub fn set_participant_role(
628 &mut self,
629 user_id: u64,
630 role: proto::ChannelRole,
631 cx: &ModelContext<Self>,
632 ) -> Task<Result<()>> {
633 let client = self.client.clone();
634 let room_id = self.id;
635 let role = role.into();
636 cx.spawn(|_, _| async move {
637 client
638 .request(proto::SetRoomParticipantRole {
639 room_id,
640 user_id,
641 role,
642 })
643 .await
644 .map(|_| ())
645 })
646 }
647
648 pub fn pending_participants(&self) -> &[Arc<User>] {
649 &self.pending_participants
650 }
651
652 pub fn contains_participant(&self, user_id: u64) -> bool {
653 self.participant_user_ids.contains(&user_id)
654 }
655
656 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
657 self.follows_by_leader_id_project_id
658 .get(&(leader_id, project_id))
659 .map_or(&[], |v| v.as_slice())
660 }
661
662 /// Returns the most 'active' projects, defined as most people in the project
663 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
664 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
665 for participant in self.remote_participants.values() {
666 match participant.location {
667 ParticipantLocation::SharedProject { project_id } => {
668 project_hosts_and_guest_counts
669 .entry(project_id)
670 .or_default()
671 .1 += 1;
672 }
673 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
674 }
675 for project in &participant.projects {
676 project_hosts_and_guest_counts
677 .entry(project.id)
678 .or_default()
679 .0 = Some(participant.user.id);
680 }
681 }
682
683 if let Some(user) = self.user_store.read(cx).current_user() {
684 for project in &self.local_participant.projects {
685 project_hosts_and_guest_counts
686 .entry(project.id)
687 .or_default()
688 .0 = Some(user.id);
689 }
690 }
691
692 project_hosts_and_guest_counts
693 .into_iter()
694 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
695 .max_by_key(|(_, _, guest_count)| *guest_count)
696 .map(|(id, host, _)| (id, host))
697 }
698
699 async fn handle_room_updated(
700 this: Model<Self>,
701 envelope: TypedEnvelope<proto::RoomUpdated>,
702 _: Arc<Client>,
703 mut cx: AsyncAppContext,
704 ) -> Result<()> {
705 let room = envelope
706 .payload
707 .room
708 .ok_or_else(|| anyhow!("invalid room"))?;
709 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
710 }
711
712 fn apply_room_update(
713 &mut self,
714 mut room: proto::Room,
715 cx: &mut ModelContext<Self>,
716 ) -> Result<()> {
717 // Filter ourselves out from the room's participants.
718 let local_participant_ix = room
719 .participants
720 .iter()
721 .position(|participant| Some(participant.user_id) == self.client.user_id());
722 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
723
724 let pending_participant_user_ids = room
725 .pending_participants
726 .iter()
727 .map(|p| p.user_id)
728 .collect::<Vec<_>>();
729
730 let remote_participant_user_ids = room
731 .participants
732 .iter()
733 .map(|p| p.user_id)
734 .collect::<Vec<_>>();
735
736 let (remote_participants, pending_participants) =
737 self.user_store.update(cx, move |user_store, cx| {
738 (
739 user_store.get_users(remote_participant_user_ids, cx),
740 user_store.get_users(pending_participant_user_ids, cx),
741 )
742 });
743
744 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
745 let (remote_participants, pending_participants) =
746 futures::join!(remote_participants, pending_participants);
747
748 this.update(&mut cx, |this, cx| {
749 this.participant_user_ids.clear();
750
751 if let Some(participant) = local_participant {
752 let role = participant.role();
753 this.local_participant.projects = participant.projects;
754 if this.local_participant.role != role {
755 this.local_participant.role = role;
756
757 if role == proto::ChannelRole::Guest {
758 for project in mem::take(&mut this.shared_projects) {
759 if let Some(project) = project.upgrade() {
760 this.unshare_project(project, cx).log_err();
761 }
762 }
763 this.local_participant.projects.clear();
764 if let Some(live_kit_room) = &mut this.live_kit {
765 live_kit_room.stop_publishing(cx);
766 }
767 }
768
769 this.joined_projects.retain(|project| {
770 if let Some(project) = project.upgrade() {
771 project.update(cx, |project, cx| project.set_role(role, cx));
772 true
773 } else {
774 false
775 }
776 });
777 }
778 } else {
779 this.local_participant.projects.clear();
780 }
781
782 if let Some(participants) = remote_participants.log_err() {
783 for (participant, user) in room.participants.into_iter().zip(participants) {
784 let Some(peer_id) = participant.peer_id else {
785 continue;
786 };
787 let participant_index = ParticipantIndex(participant.participant_index);
788 this.participant_user_ids.insert(participant.user_id);
789
790 let old_projects = this
791 .remote_participants
792 .get(&participant.user_id)
793 .into_iter()
794 .flat_map(|existing| &existing.projects)
795 .map(|project| project.id)
796 .collect::<HashSet<_>>();
797 let new_projects = participant
798 .projects
799 .iter()
800 .map(|project| project.id)
801 .collect::<HashSet<_>>();
802
803 for project in &participant.projects {
804 if !old_projects.contains(&project.id) {
805 cx.emit(Event::RemoteProjectShared {
806 owner: user.clone(),
807 project_id: project.id,
808 worktree_root_names: project.worktree_root_names.clone(),
809 });
810 }
811 }
812
813 for unshared_project_id in old_projects.difference(&new_projects) {
814 this.joined_projects.retain(|project| {
815 if let Some(project) = project.upgrade() {
816 project.update(cx, |project, cx| {
817 if project.remote_id() == Some(*unshared_project_id) {
818 project.disconnected_from_host(cx);
819 false
820 } else {
821 true
822 }
823 })
824 } else {
825 false
826 }
827 });
828 cx.emit(Event::RemoteProjectUnshared {
829 project_id: *unshared_project_id,
830 });
831 }
832
833 let role = participant.role();
834 let location = ParticipantLocation::from_proto(participant.location)
835 .unwrap_or(ParticipantLocation::External);
836 if let Some(remote_participant) =
837 this.remote_participants.get_mut(&participant.user_id)
838 {
839 remote_participant.peer_id = peer_id;
840 remote_participant.projects = participant.projects;
841 remote_participant.participant_index = participant_index;
842 if location != remote_participant.location
843 || role != remote_participant.role
844 {
845 remote_participant.location = location;
846 remote_participant.role = role;
847 cx.emit(Event::ParticipantLocationChanged {
848 participant_id: peer_id,
849 });
850 }
851 } else {
852 this.remote_participants.insert(
853 participant.user_id,
854 RemoteParticipant {
855 user: user.clone(),
856 participant_index,
857 peer_id,
858 projects: participant.projects,
859 location,
860 role,
861 muted: true,
862 speaking: false,
863 video_tracks: Default::default(),
864 audio_tracks: Default::default(),
865 },
866 );
867
868 Audio::play_sound(Sound::Joined, cx);
869
870 if let Some(live_kit) = this.live_kit.as_ref() {
871 let video_tracks =
872 live_kit.room.remote_video_tracks(&user.id.to_string());
873 let audio_tracks =
874 live_kit.room.remote_audio_tracks(&user.id.to_string());
875 let publications = live_kit
876 .room
877 .remote_audio_track_publications(&user.id.to_string());
878
879 for track in video_tracks {
880 this.remote_video_track_updated(
881 RemoteVideoTrackUpdate::Subscribed(track),
882 cx,
883 )
884 .log_err();
885 }
886
887 for (track, publication) in
888 audio_tracks.iter().zip(publications.iter())
889 {
890 this.remote_audio_track_updated(
891 RemoteAudioTrackUpdate::Subscribed(
892 track.clone(),
893 publication.clone(),
894 ),
895 cx,
896 )
897 .log_err();
898 }
899 }
900 }
901 }
902
903 this.remote_participants.retain(|user_id, participant| {
904 if this.participant_user_ids.contains(user_id) {
905 true
906 } else {
907 for project in &participant.projects {
908 cx.emit(Event::RemoteProjectUnshared {
909 project_id: project.id,
910 });
911 }
912 false
913 }
914 });
915 }
916
917 if let Some(pending_participants) = pending_participants.log_err() {
918 this.pending_participants = pending_participants;
919 for participant in &this.pending_participants {
920 this.participant_user_ids.insert(participant.id);
921 }
922 }
923
924 this.follows_by_leader_id_project_id.clear();
925 for follower in room.followers {
926 let project_id = follower.project_id;
927 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
928 (Some(leader), Some(follower)) => (leader, follower),
929
930 _ => {
931 log::error!("Follower message {follower:?} missing some state");
932 continue;
933 }
934 };
935
936 let list = this
937 .follows_by_leader_id_project_id
938 .entry((leader, project_id))
939 .or_insert(Vec::new());
940 if !list.contains(&follower) {
941 list.push(follower);
942 }
943 }
944
945 this.pending_room_update.take();
946 if this.should_leave() {
947 log::info!("room is empty, leaving");
948 let _ = this.leave(cx);
949 }
950
951 this.user_store.update(cx, |user_store, cx| {
952 let participant_indices_by_user_id = this
953 .remote_participants
954 .iter()
955 .map(|(user_id, participant)| (*user_id, participant.participant_index))
956 .collect();
957 user_store.set_participant_indices(participant_indices_by_user_id, cx);
958 });
959
960 this.check_invariants();
961 this.room_update_completed_tx.try_send(Some(())).ok();
962 cx.notify();
963 })
964 .ok();
965 }));
966
967 cx.notify();
968 Ok(())
969 }
970
971 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
972 let mut done_rx = self.room_update_completed_rx.clone();
973 async move {
974 while let Some(result) = done_rx.next().await {
975 if result.is_some() {
976 break;
977 }
978 }
979 }
980 }
981
982 fn remote_video_track_updated(
983 &mut self,
984 change: RemoteVideoTrackUpdate,
985 cx: &mut ModelContext<Self>,
986 ) -> Result<()> {
987 match change {
988 RemoteVideoTrackUpdate::Subscribed(track) => {
989 let user_id = track.publisher_id().parse()?;
990 let track_id = track.sid().to_string();
991 let participant = self
992 .remote_participants
993 .get_mut(&user_id)
994 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
995 participant.video_tracks.insert(track_id.clone(), track);
996 cx.emit(Event::RemoteVideoTracksChanged {
997 participant_id: participant.peer_id,
998 });
999 }
1000 RemoteVideoTrackUpdate::Unsubscribed {
1001 publisher_id,
1002 track_id,
1003 } => {
1004 let user_id = publisher_id.parse()?;
1005 let participant = self
1006 .remote_participants
1007 .get_mut(&user_id)
1008 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1009 participant.video_tracks.remove(&track_id);
1010 cx.emit(Event::RemoteVideoTracksChanged {
1011 participant_id: participant.peer_id,
1012 });
1013 }
1014 }
1015
1016 cx.notify();
1017 Ok(())
1018 }
1019
1020 fn remote_audio_track_updated(
1021 &mut self,
1022 change: RemoteAudioTrackUpdate,
1023 cx: &mut ModelContext<Self>,
1024 ) -> Result<()> {
1025 match change {
1026 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
1027 let mut speaker_ids = speakers
1028 .into_iter()
1029 .filter_map(|speaker_sid| speaker_sid.parse().ok())
1030 .collect::<Vec<u64>>();
1031 speaker_ids.sort_unstable();
1032 for (sid, participant) in &mut self.remote_participants {
1033 if let Ok(_) = speaker_ids.binary_search(sid) {
1034 participant.speaking = true;
1035 } else {
1036 participant.speaking = false;
1037 }
1038 }
1039 if let Some(id) = self.client.user_id() {
1040 if let Some(room) = &mut self.live_kit {
1041 if let Ok(_) = speaker_ids.binary_search(&id) {
1042 room.speaking = true;
1043 } else {
1044 room.speaking = false;
1045 }
1046 }
1047 }
1048 cx.notify();
1049 }
1050 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
1051 let mut found = false;
1052 for participant in &mut self.remote_participants.values_mut() {
1053 for track in participant.audio_tracks.values() {
1054 if track.sid() == track_id {
1055 found = true;
1056 break;
1057 }
1058 }
1059 if found {
1060 participant.muted = muted;
1061 break;
1062 }
1063 }
1064
1065 cx.notify();
1066 }
1067 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1068 let user_id = track.publisher_id().parse()?;
1069 let track_id = track.sid().to_string();
1070 let participant = self
1071 .remote_participants
1072 .get_mut(&user_id)
1073 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1074 participant.audio_tracks.insert(track_id.clone(), track);
1075 participant.muted = publication.is_muted();
1076
1077 cx.emit(Event::RemoteAudioTracksChanged {
1078 participant_id: participant.peer_id,
1079 });
1080 }
1081 RemoteAudioTrackUpdate::Unsubscribed {
1082 publisher_id,
1083 track_id,
1084 } => {
1085 let user_id = publisher_id.parse()?;
1086 let participant = self
1087 .remote_participants
1088 .get_mut(&user_id)
1089 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1090 participant.audio_tracks.remove(&track_id);
1091 cx.emit(Event::RemoteAudioTracksChanged {
1092 participant_id: participant.peer_id,
1093 });
1094 }
1095 }
1096
1097 cx.notify();
1098 Ok(())
1099 }
1100
1101 fn check_invariants(&self) {
1102 #[cfg(any(test, feature = "test-support"))]
1103 {
1104 for participant in self.remote_participants.values() {
1105 assert!(self.participant_user_ids.contains(&participant.user.id));
1106 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1107 }
1108
1109 for participant in &self.pending_participants {
1110 assert!(self.participant_user_ids.contains(&participant.id));
1111 assert_ne!(participant.id, self.client.user_id().unwrap());
1112 }
1113
1114 assert_eq!(
1115 self.participant_user_ids.len(),
1116 self.remote_participants.len() + self.pending_participants.len()
1117 );
1118 }
1119 }
1120
1121 pub(crate) fn call(
1122 &mut self,
1123 called_user_id: u64,
1124 initial_project_id: Option<u64>,
1125 cx: &mut ModelContext<Self>,
1126 ) -> Task<Result<()>> {
1127 if self.status.is_offline() {
1128 return Task::ready(Err(anyhow!("room is offline")));
1129 }
1130
1131 cx.notify();
1132 let client = self.client.clone();
1133 let room_id = self.id;
1134 self.pending_call_count += 1;
1135 cx.spawn(move |this, mut cx| async move {
1136 let result = client
1137 .request(proto::Call {
1138 room_id,
1139 called_user_id,
1140 initial_project_id,
1141 })
1142 .await;
1143 this.update(&mut cx, |this, cx| {
1144 this.pending_call_count -= 1;
1145 if this.should_leave() {
1146 this.leave(cx).detach_and_log_err(cx);
1147 }
1148 })?;
1149 result?;
1150 Ok(())
1151 })
1152 }
1153
1154 pub fn join_project(
1155 &mut self,
1156 id: u64,
1157 language_registry: Arc<LanguageRegistry>,
1158 fs: Arc<dyn Fs>,
1159 cx: &mut ModelContext<Self>,
1160 ) -> Task<Result<Model<Project>>> {
1161 let client = self.client.clone();
1162 let user_store = self.user_store.clone();
1163 let role = self.local_participant.role;
1164 cx.emit(Event::RemoteProjectJoined { project_id: id });
1165 cx.spawn(move |this, mut cx| async move {
1166 let project = Project::remote(
1167 id,
1168 client,
1169 user_store,
1170 language_registry,
1171 fs,
1172 role,
1173 cx.clone(),
1174 )
1175 .await?;
1176
1177 this.update(&mut cx, |this, cx| {
1178 this.joined_projects.retain(|project| {
1179 if let Some(project) = project.upgrade() {
1180 !project.read(cx).is_disconnected()
1181 } else {
1182 false
1183 }
1184 });
1185 this.joined_projects.insert(project.downgrade());
1186 })?;
1187 Ok(project)
1188 })
1189 }
1190
1191 pub(crate) fn share_project(
1192 &mut self,
1193 project: Model<Project>,
1194 cx: &mut ModelContext<Self>,
1195 ) -> Task<Result<u64>> {
1196 if let Some(project_id) = project.read(cx).remote_id() {
1197 return Task::ready(Ok(project_id));
1198 }
1199
1200 let request = self.client.request(proto::ShareProject {
1201 room_id: self.id(),
1202 worktrees: project.read(cx).worktree_metadata_protos(cx),
1203 });
1204 cx.spawn(|this, mut cx| async move {
1205 let response = request.await?;
1206
1207 project.update(&mut cx, |project, cx| {
1208 project.shared(response.project_id, cx)
1209 })??;
1210
1211 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1212 this.update(&mut cx, |this, cx| {
1213 this.shared_projects.insert(project.downgrade());
1214 let active_project = this.local_participant.active_project.as_ref();
1215 if active_project.map_or(false, |location| *location == project) {
1216 this.set_location(Some(&project), cx)
1217 } else {
1218 Task::ready(Ok(()))
1219 }
1220 })?
1221 .await?;
1222
1223 Ok(response.project_id)
1224 })
1225 }
1226
1227 pub(crate) fn unshare_project(
1228 &mut self,
1229 project: Model<Project>,
1230 cx: &mut ModelContext<Self>,
1231 ) -> Result<()> {
1232 let project_id = match project.read(cx).remote_id() {
1233 Some(project_id) => project_id,
1234 None => return Ok(()),
1235 };
1236
1237 self.client.send(proto::UnshareProject { project_id })?;
1238 project.update(cx, |this, cx| this.unshare(cx))
1239 }
1240
1241 pub(crate) fn set_location(
1242 &mut self,
1243 project: Option<&Model<Project>>,
1244 cx: &mut ModelContext<Self>,
1245 ) -> Task<Result<()>> {
1246 if self.status.is_offline() {
1247 return Task::ready(Err(anyhow!("room is offline")));
1248 }
1249
1250 let client = self.client.clone();
1251 let room_id = self.id;
1252 let location = if let Some(project) = project {
1253 self.local_participant.active_project = Some(project.downgrade());
1254 if let Some(project_id) = project.read(cx).remote_id() {
1255 proto::participant_location::Variant::SharedProject(
1256 proto::participant_location::SharedProject { id: project_id },
1257 )
1258 } else {
1259 proto::participant_location::Variant::UnsharedProject(
1260 proto::participant_location::UnsharedProject {},
1261 )
1262 }
1263 } else {
1264 self.local_participant.active_project = None;
1265 proto::participant_location::Variant::External(proto::participant_location::External {})
1266 };
1267
1268 cx.notify();
1269 cx.background_executor().spawn(async move {
1270 client
1271 .request(proto::UpdateParticipantLocation {
1272 room_id,
1273 location: Some(proto::ParticipantLocation {
1274 variant: Some(location),
1275 }),
1276 })
1277 .await?;
1278 Ok(())
1279 })
1280 }
1281
1282 pub fn is_screen_sharing(&self) -> bool {
1283 self.live_kit.as_ref().map_or(false, |live_kit| {
1284 !matches!(live_kit.screen_track, LocalTrack::None)
1285 })
1286 }
1287
1288 pub fn is_sharing_mic(&self) -> bool {
1289 self.live_kit.as_ref().map_or(false, |live_kit| {
1290 !matches!(live_kit.microphone_track, LocalTrack::None)
1291 })
1292 }
1293
1294 pub fn is_muted(&self, cx: &AppContext) -> bool {
1295 self.live_kit
1296 .as_ref()
1297 .and_then(|live_kit| match &live_kit.microphone_track {
1298 LocalTrack::None => Some(Self::mute_on_join(cx)),
1299 LocalTrack::Pending { muted, .. } => Some(*muted),
1300 LocalTrack::Published { muted, .. } => Some(*muted),
1301 })
1302 .unwrap_or(false)
1303 }
1304
1305 pub fn read_only(&self) -> bool {
1306 !(self.local_participant().role == proto::ChannelRole::Member
1307 || self.local_participant().role == proto::ChannelRole::Admin)
1308 }
1309
1310 pub fn is_speaking(&self) -> bool {
1311 self.live_kit
1312 .as_ref()
1313 .map_or(false, |live_kit| live_kit.speaking)
1314 }
1315
1316 pub fn is_deafened(&self) -> Option<bool> {
1317 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1318 }
1319
1320 #[track_caller]
1321 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1322 if self.status.is_offline() {
1323 return Task::ready(Err(anyhow!("room is offline")));
1324 } else if self.is_sharing_mic() {
1325 return Task::ready(Err(anyhow!("microphone was already shared")));
1326 }
1327
1328 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1329 let publish_id = post_inc(&mut live_kit.next_publish_id);
1330 live_kit.microphone_track = LocalTrack::Pending {
1331 publish_id,
1332 muted: false,
1333 };
1334 cx.notify();
1335 publish_id
1336 } else {
1337 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1338 };
1339
1340 cx.spawn(move |this, mut cx| async move {
1341 let publish_track = async {
1342 let track = LocalAudioTrack::create();
1343 this.upgrade()
1344 .ok_or_else(|| anyhow!("room was dropped"))?
1345 .update(&mut cx, |this, _| {
1346 this.live_kit
1347 .as_ref()
1348 .map(|live_kit| live_kit.room.publish_audio_track(track))
1349 })?
1350 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1351 .await
1352 };
1353 let publication = publish_track.await;
1354 this.upgrade()
1355 .ok_or_else(|| anyhow!("room was dropped"))?
1356 .update(&mut cx, |this, cx| {
1357 let live_kit = this
1358 .live_kit
1359 .as_mut()
1360 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1361
1362 let (canceled, muted) = if let LocalTrack::Pending {
1363 publish_id: cur_publish_id,
1364 muted,
1365 } = &live_kit.microphone_track
1366 {
1367 (*cur_publish_id != publish_id, *muted)
1368 } else {
1369 (true, false)
1370 };
1371
1372 match publication {
1373 Ok(publication) => {
1374 if canceled {
1375 live_kit.room.unpublish_track(publication);
1376 } else {
1377 if muted {
1378 cx.background_executor()
1379 .spawn(publication.set_mute(muted))
1380 .detach();
1381 }
1382 live_kit.microphone_track = LocalTrack::Published {
1383 track_publication: publication,
1384 muted,
1385 };
1386 cx.notify();
1387 }
1388 Ok(())
1389 }
1390 Err(error) => {
1391 if canceled {
1392 Ok(())
1393 } else {
1394 live_kit.microphone_track = LocalTrack::None;
1395 cx.notify();
1396 Err(error)
1397 }
1398 }
1399 }
1400 })?
1401 })
1402 }
1403
1404 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1405 if self.status.is_offline() {
1406 return Task::ready(Err(anyhow!("room is offline")));
1407 } else if self.is_screen_sharing() {
1408 return Task::ready(Err(anyhow!("screen was already shared")));
1409 }
1410
1411 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1412 let publish_id = post_inc(&mut live_kit.next_publish_id);
1413 live_kit.screen_track = LocalTrack::Pending {
1414 publish_id,
1415 muted: false,
1416 };
1417 cx.notify();
1418 (live_kit.room.display_sources(), publish_id)
1419 } else {
1420 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1421 };
1422
1423 cx.spawn(move |this, mut cx| async move {
1424 let publish_track = async {
1425 let displays = displays.await?;
1426 let display = displays
1427 .first()
1428 .ok_or_else(|| anyhow!("no display found"))?;
1429 let track = LocalVideoTrack::screen_share_for_display(&display);
1430 this.upgrade()
1431 .ok_or_else(|| anyhow!("room was dropped"))?
1432 .update(&mut cx, |this, _| {
1433 this.live_kit
1434 .as_ref()
1435 .map(|live_kit| live_kit.room.publish_video_track(track))
1436 })?
1437 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1438 .await
1439 };
1440
1441 let publication = publish_track.await;
1442 this.upgrade()
1443 .ok_or_else(|| anyhow!("room was dropped"))?
1444 .update(&mut cx, |this, cx| {
1445 let live_kit = this
1446 .live_kit
1447 .as_mut()
1448 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1449
1450 let (canceled, muted) = if let LocalTrack::Pending {
1451 publish_id: cur_publish_id,
1452 muted,
1453 } = &live_kit.screen_track
1454 {
1455 (*cur_publish_id != publish_id, *muted)
1456 } else {
1457 (true, false)
1458 };
1459
1460 match publication {
1461 Ok(publication) => {
1462 if canceled {
1463 live_kit.room.unpublish_track(publication);
1464 } else {
1465 if muted {
1466 cx.background_executor()
1467 .spawn(publication.set_mute(muted))
1468 .detach();
1469 }
1470 live_kit.screen_track = LocalTrack::Published {
1471 track_publication: publication,
1472 muted,
1473 };
1474 cx.notify();
1475 }
1476
1477 Audio::play_sound(Sound::StartScreenshare, cx);
1478
1479 Ok(())
1480 }
1481 Err(error) => {
1482 if canceled {
1483 Ok(())
1484 } else {
1485 live_kit.screen_track = LocalTrack::None;
1486 cx.notify();
1487 Err(error)
1488 }
1489 }
1490 }
1491 })?
1492 })
1493 }
1494
1495 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1496 let should_mute = !self.is_muted(cx);
1497 if let Some(live_kit) = self.live_kit.as_mut() {
1498 if matches!(live_kit.microphone_track, LocalTrack::None) {
1499 return Ok(self.share_microphone(cx));
1500 }
1501
1502 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1503 live_kit.muted_by_user = should_mute;
1504
1505 if old_muted == true && live_kit.deafened == true {
1506 if let Some(task) = self.toggle_deafen(cx).ok() {
1507 task.detach();
1508 }
1509 }
1510
1511 Ok(ret_task)
1512 } else {
1513 Err(anyhow!("LiveKit not started"))
1514 }
1515 }
1516
1517 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1518 if let Some(live_kit) = self.live_kit.as_mut() {
1519 (*live_kit).deafened = !live_kit.deafened;
1520
1521 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1522 // Context notification is sent within set_mute itself.
1523 let mut mute_task = None;
1524 // When deafening, mute user's mic as well.
1525 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1526 if live_kit.deafened || !live_kit.muted_by_user {
1527 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1528 };
1529 for participant in self.remote_participants.values() {
1530 for track in live_kit
1531 .room
1532 .remote_audio_track_publications(&participant.user.id.to_string())
1533 {
1534 let deafened = live_kit.deafened;
1535 tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1536 }
1537 }
1538
1539 Ok(cx.foreground_executor().spawn(async move {
1540 if let Some(mute_task) = mute_task {
1541 mute_task.await?;
1542 }
1543 for task in tasks {
1544 task.await?;
1545 }
1546 Ok(())
1547 }))
1548 } else {
1549 Err(anyhow!("LiveKit not started"))
1550 }
1551 }
1552
1553 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1554 if self.status.is_offline() {
1555 return Err(anyhow!("room is offline"));
1556 }
1557
1558 let live_kit = self
1559 .live_kit
1560 .as_mut()
1561 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1562 match mem::take(&mut live_kit.screen_track) {
1563 LocalTrack::None => Err(anyhow!("screen was not shared")),
1564 LocalTrack::Pending { .. } => {
1565 cx.notify();
1566 Ok(())
1567 }
1568 LocalTrack::Published {
1569 track_publication, ..
1570 } => {
1571 live_kit.room.unpublish_track(track_publication);
1572 cx.notify();
1573
1574 Audio::play_sound(Sound::StopScreenshare, cx);
1575 Ok(())
1576 }
1577 }
1578 }
1579
1580 #[cfg(any(test, feature = "test-support"))]
1581 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1582 self.live_kit
1583 .as_ref()
1584 .unwrap()
1585 .room
1586 .set_display_sources(sources);
1587 }
1588}
1589
1590struct LiveKitRoom {
1591 room: Arc<live_kit_client::Room>,
1592 screen_track: LocalTrack,
1593 microphone_track: LocalTrack,
1594 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1595 muted_by_user: bool,
1596 deafened: bool,
1597 speaking: bool,
1598 next_publish_id: usize,
1599 _maintain_room: Task<()>,
1600 _maintain_tracks: [Task<()>; 2],
1601}
1602
1603impl LiveKitRoom {
1604 fn set_mute(
1605 self: &mut LiveKitRoom,
1606 should_mute: bool,
1607 cx: &mut ModelContext<Room>,
1608 ) -> Result<(Task<Result<()>>, bool)> {
1609 if !should_mute {
1610 // clear user muting state.
1611 self.muted_by_user = false;
1612 }
1613
1614 let (result, old_muted) = match &mut self.microphone_track {
1615 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1616 LocalTrack::Pending { muted, .. } => {
1617 let old_muted = *muted;
1618 *muted = should_mute;
1619 cx.notify();
1620 Ok((Task::Ready(Some(Ok(()))), old_muted))
1621 }
1622 LocalTrack::Published {
1623 track_publication,
1624 muted,
1625 } => {
1626 let old_muted = *muted;
1627 *muted = should_mute;
1628 cx.notify();
1629 Ok((
1630 cx.background_executor()
1631 .spawn(track_publication.set_mute(*muted)),
1632 old_muted,
1633 ))
1634 }
1635 }?;
1636
1637 if old_muted != should_mute {
1638 if should_mute {
1639 Audio::play_sound(Sound::Mute, cx);
1640 } else {
1641 Audio::play_sound(Sound::Unmute, cx);
1642 }
1643 }
1644
1645 Ok((result, old_muted))
1646 }
1647
1648 fn stop_publishing(&mut self, cx: &mut ModelContext<Room>) {
1649 if let LocalTrack::Published {
1650 track_publication, ..
1651 } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1652 {
1653 self.room.unpublish_track(track_publication);
1654 cx.notify();
1655 }
1656
1657 if let LocalTrack::Published {
1658 track_publication, ..
1659 } = mem::replace(&mut self.screen_track, LocalTrack::None)
1660 {
1661 self.room.unpublish_track(track_publication);
1662 cx.notify();
1663 }
1664 }
1665}
1666
1667enum LocalTrack {
1668 None,
1669 Pending {
1670 publish_id: usize,
1671 muted: bool,
1672 },
1673 Published {
1674 track_publication: LocalTrackPublication,
1675 muted: bool,
1676 },
1677}
1678
1679impl Default for LocalTrack {
1680 fn default() -> Self {
1681 Self::None
1682 }
1683}
1684
1685#[derive(Copy, Clone, PartialEq, Eq)]
1686pub enum RoomStatus {
1687 Online,
1688 Rejoining,
1689 Offline,
1690}
1691
1692impl RoomStatus {
1693 pub fn is_offline(&self) -> bool {
1694 matches!(self, RoomStatus::Offline)
1695 }
1696
1697 pub fn is_online(&self) -> bool {
1698 matches!(self, RoomStatus::Online)
1699 }
1700}