1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
4 IncomingCall,
5};
6use anyhow::{anyhow, Result};
7use audio::{Audio, Sound};
8use client::{
9 proto::{self, PeerId},
10 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
11};
12use collections::{BTreeMap, HashMap, HashSet};
13use fs::Fs;
14use futures::{FutureExt, StreamExt};
15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
16use language::LanguageRegistry;
17use live_kit_client::{
18 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
19 RemoteVideoTrackUpdate,
20};
21use postage::stream::Stream;
22use project::Project;
23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
24use util::{post_inc, ResultExt, TryFutureExt};
25
26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
27
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum Event {
30 ParticipantLocationChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteVideoTracksChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteAudioTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteProjectShared {
40 owner: Arc<User>,
41 project_id: u64,
42 worktree_root_names: Vec<String>,
43 },
44 RemoteProjectUnshared {
45 project_id: u64,
46 },
47 RemoteProjectJoined {
48 project_id: u64,
49 },
50 RemoteProjectInvitationDiscarded {
51 project_id: u64,
52 },
53 Left,
54}
55
56pub struct Room {
57 id: u64,
58 channel_id: Option<u64>,
59 live_kit: Option<LiveKitRoom>,
60 status: RoomStatus,
61 shared_projects: HashSet<WeakModelHandle<Project>>,
62 joined_projects: HashSet<WeakModelHandle<Project>>,
63 local_participant: LocalParticipant,
64 remote_participants: BTreeMap<u64, RemoteParticipant>,
65 pending_participants: Vec<Arc<User>>,
66 participant_user_ids: HashSet<u64>,
67 pending_call_count: usize,
68 leave_when_empty: bool,
69 client: Arc<Client>,
70 user_store: ModelHandle<UserStore>,
71 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
72 subscriptions: Vec<client::Subscription>,
73 pending_room_update: Option<Task<()>>,
74 maintain_connection: Option<Task<Option<()>>>,
75}
76
77impl Entity for Room {
78 type Event = Event;
79
80 fn release(&mut self, cx: &mut AppContext) {
81 if self.status.is_online() {
82 self.leave_internal(cx).detach_and_log_err(cx);
83 }
84 }
85
86 fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
87 if self.status.is_online() {
88 let leave = self.leave_internal(cx);
89 Some(
90 cx.background()
91 .spawn(async move {
92 leave.await.log_err();
93 })
94 .boxed(),
95 )
96 } else {
97 None
98 }
99 }
100}
101
102impl Room {
103 pub fn channel_id(&self) -> Option<u64> {
104 self.channel_id
105 }
106
107 pub fn is_sharing_project(&self) -> bool {
108 !self.shared_projects.is_empty()
109 }
110
111 #[cfg(any(test, feature = "test-support"))]
112 pub fn is_connected(&self) -> bool {
113 if let Some(live_kit) = self.live_kit.as_ref() {
114 matches!(
115 *live_kit.room.status().borrow(),
116 live_kit_client::ConnectionState::Connected { .. }
117 )
118 } else {
119 false
120 }
121 }
122
123 fn new(
124 id: u64,
125 channel_id: Option<u64>,
126 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
127 client: Arc<Client>,
128 user_store: ModelHandle<UserStore>,
129 cx: &mut ModelContext<Self>,
130 ) -> Self {
131 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
132 let room = live_kit_client::Room::new();
133 let mut status = room.status();
134 // Consume the initial status of the room.
135 let _ = status.try_recv();
136 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
137 while let Some(status) = status.next().await {
138 let this = if let Some(this) = this.upgrade(&cx) {
139 this
140 } else {
141 break;
142 };
143
144 if status == live_kit_client::ConnectionState::Disconnected {
145 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
146 break;
147 }
148 }
149 });
150
151 let mut track_video_changes = room.remote_video_track_updates();
152 let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
153 while let Some(track_change) = track_video_changes.next().await {
154 let this = if let Some(this) = this.upgrade(&cx) {
155 this
156 } else {
157 break;
158 };
159
160 this.update(&mut cx, |this, cx| {
161 this.remote_video_track_updated(track_change, cx).log_err()
162 });
163 }
164 });
165
166 let mut track_audio_changes = room.remote_audio_track_updates();
167 let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
168 while let Some(track_change) = track_audio_changes.next().await {
169 let this = if let Some(this) = this.upgrade(&cx) {
170 this
171 } else {
172 break;
173 };
174
175 this.update(&mut cx, |this, cx| {
176 this.remote_audio_track_updated(track_change, cx).log_err()
177 });
178 }
179 });
180
181 let connect = room.connect(&connection_info.server_url, &connection_info.token);
182 cx.spawn(|this, mut cx| async move {
183 connect.await?;
184
185 if !cx.read(Self::mute_on_join) {
186 this.update(&mut cx, |this, cx| this.share_microphone(cx))
187 .await?;
188 }
189
190 anyhow::Ok(())
191 })
192 .detach_and_log_err(cx);
193
194 Some(LiveKitRoom {
195 room,
196 screen_track: LocalTrack::None,
197 microphone_track: LocalTrack::None,
198 next_publish_id: 0,
199 muted_by_user: false,
200 deafened: false,
201 speaking: false,
202 _maintain_room,
203 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
204 })
205 } else {
206 None
207 };
208
209 let maintain_connection =
210 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
211
212 Audio::play_sound(Sound::Joined, cx);
213
214 Self {
215 id,
216 channel_id,
217 live_kit: live_kit_room,
218 status: RoomStatus::Online,
219 shared_projects: Default::default(),
220 joined_projects: Default::default(),
221 participant_user_ids: Default::default(),
222 local_participant: Default::default(),
223 remote_participants: Default::default(),
224 pending_participants: Default::default(),
225 pending_call_count: 0,
226 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
227 leave_when_empty: false,
228 pending_room_update: None,
229 client,
230 user_store,
231 follows_by_leader_id_project_id: Default::default(),
232 maintain_connection: Some(maintain_connection),
233 }
234 }
235
236 pub(crate) fn create(
237 called_user_id: u64,
238 initial_project: Option<ModelHandle<Project>>,
239 client: Arc<Client>,
240 user_store: ModelHandle<UserStore>,
241 cx: &mut AppContext,
242 ) -> Task<Result<ModelHandle<Self>>> {
243 cx.spawn(|mut cx| async move {
244 let response = client.request(proto::CreateRoom {}).await?;
245 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
246 let room = cx.add_model(|cx| {
247 Self::new(
248 room_proto.id,
249 None,
250 response.live_kit_connection_info,
251 client,
252 user_store,
253 cx,
254 )
255 });
256
257 let initial_project_id = if let Some(initial_project) = initial_project {
258 let initial_project_id = room
259 .update(&mut cx, |room, cx| {
260 room.share_project(initial_project.clone(), cx)
261 })
262 .await?;
263 Some(initial_project_id)
264 } else {
265 None
266 };
267
268 match room
269 .update(&mut cx, |room, cx| {
270 room.leave_when_empty = true;
271 room.call(called_user_id, initial_project_id, cx)
272 })
273 .await
274 {
275 Ok(()) => Ok(room),
276 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
277 }
278 })
279 }
280
281 pub(crate) fn join_channel(
282 channel_id: u64,
283 client: Arc<Client>,
284 user_store: ModelHandle<UserStore>,
285 cx: &mut AppContext,
286 ) -> Task<Result<ModelHandle<Self>>> {
287 cx.spawn(|cx| async move {
288 Self::from_join_response(
289 client.request(proto::JoinChannel { channel_id }).await?,
290 client,
291 user_store,
292 cx,
293 )
294 })
295 }
296
297 pub(crate) fn join(
298 call: &IncomingCall,
299 client: Arc<Client>,
300 user_store: ModelHandle<UserStore>,
301 cx: &mut AppContext,
302 ) -> Task<Result<ModelHandle<Self>>> {
303 let id = call.room_id;
304 cx.spawn(|cx| async move {
305 Self::from_join_response(
306 client.request(proto::JoinRoom { id }).await?,
307 client,
308 user_store,
309 cx,
310 )
311 })
312 }
313
314 pub fn mute_on_join(cx: &AppContext) -> bool {
315 settings::get::<CallSettings>(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
316 }
317
318 fn from_join_response(
319 response: proto::JoinRoomResponse,
320 client: Arc<Client>,
321 user_store: ModelHandle<UserStore>,
322 mut cx: AsyncAppContext,
323 ) -> Result<ModelHandle<Self>> {
324 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
325 let room = cx.add_model(|cx| {
326 Self::new(
327 room_proto.id,
328 response.channel_id,
329 response.live_kit_connection_info,
330 client,
331 user_store,
332 cx,
333 )
334 });
335 room.update(&mut cx, |room, cx| {
336 room.leave_when_empty = room.channel_id.is_none();
337 room.apply_room_update(room_proto, cx)?;
338 anyhow::Ok(())
339 })?;
340 Ok(room)
341 }
342
343 fn should_leave(&self) -> bool {
344 self.leave_when_empty
345 && self.pending_room_update.is_none()
346 && self.pending_participants.is_empty()
347 && self.remote_participants.is_empty()
348 && self.pending_call_count == 0
349 }
350
351 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
352 cx.notify();
353 cx.emit(Event::Left);
354 self.leave_internal(cx)
355 }
356
357 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
358 if self.status.is_offline() {
359 return Task::ready(Err(anyhow!("room is offline")));
360 }
361
362 log::info!("leaving room");
363 Audio::play_sound(Sound::Leave, cx);
364
365 self.clear_state(cx);
366
367 let leave_room = self.client.request(proto::LeaveRoom {});
368 cx.background().spawn(async move {
369 leave_room.await?;
370 anyhow::Ok(())
371 })
372 }
373
374 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
375 for project in self.shared_projects.drain() {
376 if let Some(project) = project.upgrade(cx) {
377 project.update(cx, |project, cx| {
378 project.unshare(cx).log_err();
379 });
380 }
381 }
382 for project in self.joined_projects.drain() {
383 if let Some(project) = project.upgrade(cx) {
384 project.update(cx, |project, cx| {
385 project.disconnected_from_host(cx);
386 project.close(cx);
387 });
388 }
389 }
390
391 self.status = RoomStatus::Offline;
392 self.remote_participants.clear();
393 self.pending_participants.clear();
394 self.participant_user_ids.clear();
395 self.subscriptions.clear();
396 self.live_kit.take();
397 self.pending_room_update.take();
398 self.maintain_connection.take();
399 }
400
401 async fn maintain_connection(
402 this: WeakModelHandle<Self>,
403 client: Arc<Client>,
404 mut cx: AsyncAppContext,
405 ) -> Result<()> {
406 let mut client_status = client.status();
407 loop {
408 let _ = client_status.try_recv();
409 let is_connected = client_status.borrow().is_connected();
410 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
411 if !is_connected || client_status.next().await.is_some() {
412 log::info!("detected client disconnection");
413
414 this.upgrade(&cx)
415 .ok_or_else(|| anyhow!("room was dropped"))?
416 .update(&mut cx, |this, cx| {
417 this.status = RoomStatus::Rejoining;
418 cx.notify();
419 });
420
421 // Wait for client to re-establish a connection to the server.
422 {
423 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
424 let client_reconnection = async {
425 let mut remaining_attempts = 3;
426 while remaining_attempts > 0 {
427 if client_status.borrow().is_connected() {
428 log::info!("client reconnected, attempting to rejoin room");
429
430 let Some(this) = this.upgrade(&cx) else { break };
431 if this
432 .update(&mut cx, |this, cx| this.rejoin(cx))
433 .await
434 .log_err()
435 .is_some()
436 {
437 return true;
438 } else {
439 remaining_attempts -= 1;
440 }
441 } else if client_status.borrow().is_signed_out() {
442 return false;
443 }
444
445 log::info!(
446 "waiting for client status change, remaining attempts {}",
447 remaining_attempts
448 );
449 client_status.next().await;
450 }
451 false
452 }
453 .fuse();
454 futures::pin_mut!(client_reconnection);
455
456 futures::select_biased! {
457 reconnected = client_reconnection => {
458 if reconnected {
459 log::info!("successfully reconnected to room");
460 // If we successfully joined the room, go back around the loop
461 // waiting for future connection status changes.
462 continue;
463 }
464 }
465 _ = reconnection_timeout => {
466 log::info!("room reconnection timeout expired");
467 }
468 }
469 }
470
471 break;
472 }
473 }
474
475 // The client failed to re-establish a connection to the server
476 // or an error occurred while trying to re-join the room. Either way
477 // we leave the room and return an error.
478 if let Some(this) = this.upgrade(&cx) {
479 log::info!("reconnection failed, leaving room");
480 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
481 }
482 Err(anyhow!(
483 "can't reconnect to room: client failed to re-establish connection"
484 ))
485 }
486
487 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
488 let mut projects = HashMap::default();
489 let mut reshared_projects = Vec::new();
490 let mut rejoined_projects = Vec::new();
491 self.shared_projects.retain(|project| {
492 if let Some(handle) = project.upgrade(cx) {
493 let project = handle.read(cx);
494 if let Some(project_id) = project.remote_id() {
495 projects.insert(project_id, handle.clone());
496 reshared_projects.push(proto::UpdateProject {
497 project_id,
498 worktrees: project.worktree_metadata_protos(cx),
499 });
500 return true;
501 }
502 }
503 false
504 });
505 self.joined_projects.retain(|project| {
506 if let Some(handle) = project.upgrade(cx) {
507 let project = handle.read(cx);
508 if let Some(project_id) = project.remote_id() {
509 projects.insert(project_id, handle.clone());
510 rejoined_projects.push(proto::RejoinProject {
511 id: project_id,
512 worktrees: project
513 .worktrees(cx)
514 .map(|worktree| {
515 let worktree = worktree.read(cx);
516 proto::RejoinWorktree {
517 id: worktree.id().to_proto(),
518 scan_id: worktree.completed_scan_id() as u64,
519 }
520 })
521 .collect(),
522 });
523 }
524 return true;
525 }
526 false
527 });
528
529 let response = self.client.request_envelope(proto::RejoinRoom {
530 id: self.id,
531 reshared_projects,
532 rejoined_projects,
533 });
534
535 cx.spawn(|this, mut cx| async move {
536 let response = response.await?;
537 let message_id = response.message_id;
538 let response = response.payload;
539 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
540 this.update(&mut cx, |this, cx| {
541 this.status = RoomStatus::Online;
542 this.apply_room_update(room_proto, cx)?;
543
544 for reshared_project in response.reshared_projects {
545 if let Some(project) = projects.get(&reshared_project.id) {
546 project.update(cx, |project, cx| {
547 project.reshared(reshared_project, cx).log_err();
548 });
549 }
550 }
551
552 for rejoined_project in response.rejoined_projects {
553 if let Some(project) = projects.get(&rejoined_project.id) {
554 project.update(cx, |project, cx| {
555 project.rejoined(rejoined_project, message_id, cx).log_err();
556 });
557 }
558 }
559
560 anyhow::Ok(())
561 })
562 })
563 }
564
565 pub fn id(&self) -> u64 {
566 self.id
567 }
568
569 pub fn status(&self) -> RoomStatus {
570 self.status
571 }
572
573 pub fn local_participant(&self) -> &LocalParticipant {
574 &self.local_participant
575 }
576
577 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
578 &self.remote_participants
579 }
580
581 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
582 self.remote_participants
583 .values()
584 .find(|p| p.peer_id == peer_id)
585 }
586
587 pub fn pending_participants(&self) -> &[Arc<User>] {
588 &self.pending_participants
589 }
590
591 pub fn contains_participant(&self, user_id: u64) -> bool {
592 self.participant_user_ids.contains(&user_id)
593 }
594
595 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
596 self.follows_by_leader_id_project_id
597 .get(&(leader_id, project_id))
598 .map_or(&[], |v| v.as_slice())
599 }
600
601 /// Returns the most 'active' projects, defined as most people in the project
602 pub fn most_active_project(&self) -> Option<(u64, u64)> {
603 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
604 for participant in self.remote_participants.values() {
605 match participant.location {
606 ParticipantLocation::SharedProject { project_id } => {
607 project_hosts_and_guest_counts
608 .entry(project_id)
609 .or_default()
610 .1 += 1;
611 }
612 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
613 }
614 for project in &participant.projects {
615 project_hosts_and_guest_counts
616 .entry(project.id)
617 .or_default()
618 .0 = Some(participant.user.id);
619 }
620 }
621
622 project_hosts_and_guest_counts
623 .into_iter()
624 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
625 .max_by_key(|(_, _, guest_count)| *guest_count)
626 .map(|(id, host, _)| (id, host))
627 }
628
629 async fn handle_room_updated(
630 this: ModelHandle<Self>,
631 envelope: TypedEnvelope<proto::RoomUpdated>,
632 _: Arc<Client>,
633 mut cx: AsyncAppContext,
634 ) -> Result<()> {
635 let room = envelope
636 .payload
637 .room
638 .ok_or_else(|| anyhow!("invalid room"))?;
639 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
640 }
641
642 fn apply_room_update(
643 &mut self,
644 mut room: proto::Room,
645 cx: &mut ModelContext<Self>,
646 ) -> Result<()> {
647 // Filter ourselves out from the room's participants.
648 let local_participant_ix = room
649 .participants
650 .iter()
651 .position(|participant| Some(participant.user_id) == self.client.user_id());
652 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
653
654 let pending_participant_user_ids = room
655 .pending_participants
656 .iter()
657 .map(|p| p.user_id)
658 .collect::<Vec<_>>();
659
660 let remote_participant_user_ids = room
661 .participants
662 .iter()
663 .map(|p| p.user_id)
664 .collect::<Vec<_>>();
665
666 let (remote_participants, pending_participants) =
667 self.user_store.update(cx, move |user_store, cx| {
668 (
669 user_store.get_users(remote_participant_user_ids, cx),
670 user_store.get_users(pending_participant_user_ids, cx),
671 )
672 });
673
674 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
675 let (remote_participants, pending_participants) =
676 futures::join!(remote_participants, pending_participants);
677
678 this.update(&mut cx, |this, cx| {
679 this.participant_user_ids.clear();
680
681 if let Some(participant) = local_participant {
682 this.local_participant.projects = participant.projects;
683 } else {
684 this.local_participant.projects.clear();
685 }
686
687 if let Some(participants) = remote_participants.log_err() {
688 for (participant, user) in room.participants.into_iter().zip(participants) {
689 let Some(peer_id) = participant.peer_id else {
690 continue;
691 };
692 let participant_index = ParticipantIndex(participant.participant_index);
693 this.participant_user_ids.insert(participant.user_id);
694
695 let old_projects = this
696 .remote_participants
697 .get(&participant.user_id)
698 .into_iter()
699 .flat_map(|existing| &existing.projects)
700 .map(|project| project.id)
701 .collect::<HashSet<_>>();
702 let new_projects = participant
703 .projects
704 .iter()
705 .map(|project| project.id)
706 .collect::<HashSet<_>>();
707
708 for project in &participant.projects {
709 if !old_projects.contains(&project.id) {
710 cx.emit(Event::RemoteProjectShared {
711 owner: user.clone(),
712 project_id: project.id,
713 worktree_root_names: project.worktree_root_names.clone(),
714 });
715 }
716 }
717
718 for unshared_project_id in old_projects.difference(&new_projects) {
719 this.joined_projects.retain(|project| {
720 if let Some(project) = project.upgrade(cx) {
721 project.update(cx, |project, cx| {
722 if project.remote_id() == Some(*unshared_project_id) {
723 project.disconnected_from_host(cx);
724 false
725 } else {
726 true
727 }
728 })
729 } else {
730 false
731 }
732 });
733 cx.emit(Event::RemoteProjectUnshared {
734 project_id: *unshared_project_id,
735 });
736 }
737
738 let location = ParticipantLocation::from_proto(participant.location)
739 .unwrap_or(ParticipantLocation::External);
740 if let Some(remote_participant) =
741 this.remote_participants.get_mut(&participant.user_id)
742 {
743 remote_participant.peer_id = peer_id;
744 remote_participant.projects = participant.projects;
745 remote_participant.participant_index = participant_index;
746 if location != remote_participant.location {
747 remote_participant.location = location;
748 cx.emit(Event::ParticipantLocationChanged {
749 participant_id: peer_id,
750 });
751 }
752 } else {
753 this.remote_participants.insert(
754 participant.user_id,
755 RemoteParticipant {
756 user: user.clone(),
757 participant_index,
758 peer_id,
759 projects: participant.projects,
760 location,
761 muted: true,
762 speaking: false,
763 video_tracks: Default::default(),
764 audio_tracks: Default::default(),
765 },
766 );
767
768 Audio::play_sound(Sound::Joined, cx);
769
770 if let Some(live_kit) = this.live_kit.as_ref() {
771 let video_tracks =
772 live_kit.room.remote_video_tracks(&user.id.to_string());
773 let audio_tracks =
774 live_kit.room.remote_audio_tracks(&user.id.to_string());
775 let publications = live_kit
776 .room
777 .remote_audio_track_publications(&user.id.to_string());
778
779 for track in video_tracks {
780 this.remote_video_track_updated(
781 RemoteVideoTrackUpdate::Subscribed(track),
782 cx,
783 )
784 .log_err();
785 }
786
787 for (track, publication) in
788 audio_tracks.iter().zip(publications.iter())
789 {
790 this.remote_audio_track_updated(
791 RemoteAudioTrackUpdate::Subscribed(
792 track.clone(),
793 publication.clone(),
794 ),
795 cx,
796 )
797 .log_err();
798 }
799 }
800 }
801 }
802
803 this.remote_participants.retain(|user_id, participant| {
804 if this.participant_user_ids.contains(user_id) {
805 true
806 } else {
807 for project in &participant.projects {
808 cx.emit(Event::RemoteProjectUnshared {
809 project_id: project.id,
810 });
811 }
812 false
813 }
814 });
815 }
816
817 if let Some(pending_participants) = pending_participants.log_err() {
818 this.pending_participants = pending_participants;
819 for participant in &this.pending_participants {
820 this.participant_user_ids.insert(participant.id);
821 }
822 }
823
824 this.follows_by_leader_id_project_id.clear();
825 for follower in room.followers {
826 let project_id = follower.project_id;
827 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
828 (Some(leader), Some(follower)) => (leader, follower),
829
830 _ => {
831 log::error!("Follower message {follower:?} missing some state");
832 continue;
833 }
834 };
835
836 let list = this
837 .follows_by_leader_id_project_id
838 .entry((leader, project_id))
839 .or_insert(Vec::new());
840 if !list.contains(&follower) {
841 list.push(follower);
842 }
843 }
844
845 this.pending_room_update.take();
846 if this.should_leave() {
847 log::info!("room is empty, leaving");
848 let _ = this.leave(cx);
849 }
850
851 this.user_store.update(cx, |user_store, cx| {
852 let participant_indices_by_user_id = this
853 .remote_participants
854 .iter()
855 .map(|(user_id, participant)| (*user_id, participant.participant_index))
856 .collect();
857 user_store.set_participant_indices(participant_indices_by_user_id, cx);
858 });
859
860 this.check_invariants();
861 cx.notify();
862 });
863 }));
864
865 cx.notify();
866 Ok(())
867 }
868
869 fn remote_video_track_updated(
870 &mut self,
871 change: RemoteVideoTrackUpdate,
872 cx: &mut ModelContext<Self>,
873 ) -> Result<()> {
874 match change {
875 RemoteVideoTrackUpdate::Subscribed(track) => {
876 let user_id = track.publisher_id().parse()?;
877 let track_id = track.sid().to_string();
878 let participant = self
879 .remote_participants
880 .get_mut(&user_id)
881 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
882 participant.video_tracks.insert(
883 track_id.clone(),
884 Arc::new(RemoteVideoTrack {
885 live_kit_track: track,
886 }),
887 );
888 cx.emit(Event::RemoteVideoTracksChanged {
889 participant_id: participant.peer_id,
890 });
891 }
892 RemoteVideoTrackUpdate::Unsubscribed {
893 publisher_id,
894 track_id,
895 } => {
896 let user_id = publisher_id.parse()?;
897 let participant = self
898 .remote_participants
899 .get_mut(&user_id)
900 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
901 participant.video_tracks.remove(&track_id);
902 cx.emit(Event::RemoteVideoTracksChanged {
903 participant_id: participant.peer_id,
904 });
905 }
906 }
907
908 cx.notify();
909 Ok(())
910 }
911
912 fn remote_audio_track_updated(
913 &mut self,
914 change: RemoteAudioTrackUpdate,
915 cx: &mut ModelContext<Self>,
916 ) -> Result<()> {
917 match change {
918 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
919 let mut speaker_ids = speakers
920 .into_iter()
921 .filter_map(|speaker_sid| speaker_sid.parse().ok())
922 .collect::<Vec<u64>>();
923 speaker_ids.sort_unstable();
924 for (sid, participant) in &mut self.remote_participants {
925 if let Ok(_) = speaker_ids.binary_search(sid) {
926 participant.speaking = true;
927 } else {
928 participant.speaking = false;
929 }
930 }
931 if let Some(id) = self.client.user_id() {
932 if let Some(room) = &mut self.live_kit {
933 if let Ok(_) = speaker_ids.binary_search(&id) {
934 room.speaking = true;
935 } else {
936 room.speaking = false;
937 }
938 }
939 }
940 cx.notify();
941 }
942 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
943 let mut found = false;
944 for participant in &mut self.remote_participants.values_mut() {
945 for track in participant.audio_tracks.values() {
946 if track.sid() == track_id {
947 found = true;
948 break;
949 }
950 }
951 if found {
952 participant.muted = muted;
953 break;
954 }
955 }
956
957 cx.notify();
958 }
959 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
960 let user_id = track.publisher_id().parse()?;
961 let track_id = track.sid().to_string();
962 let participant = self
963 .remote_participants
964 .get_mut(&user_id)
965 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
966
967 participant.audio_tracks.insert(track_id.clone(), track);
968 participant.muted = publication.is_muted();
969
970 cx.emit(Event::RemoteAudioTracksChanged {
971 participant_id: participant.peer_id,
972 });
973 }
974 RemoteAudioTrackUpdate::Unsubscribed {
975 publisher_id,
976 track_id,
977 } => {
978 let user_id = publisher_id.parse()?;
979 let participant = self
980 .remote_participants
981 .get_mut(&user_id)
982 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
983 participant.audio_tracks.remove(&track_id);
984 cx.emit(Event::RemoteAudioTracksChanged {
985 participant_id: participant.peer_id,
986 });
987 }
988 }
989
990 cx.notify();
991 Ok(())
992 }
993
994 fn check_invariants(&self) {
995 #[cfg(any(test, feature = "test-support"))]
996 {
997 for participant in self.remote_participants.values() {
998 assert!(self.participant_user_ids.contains(&participant.user.id));
999 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1000 }
1001
1002 for participant in &self.pending_participants {
1003 assert!(self.participant_user_ids.contains(&participant.id));
1004 assert_ne!(participant.id, self.client.user_id().unwrap());
1005 }
1006
1007 assert_eq!(
1008 self.participant_user_ids.len(),
1009 self.remote_participants.len() + self.pending_participants.len()
1010 );
1011 }
1012 }
1013
1014 pub(crate) fn call(
1015 &mut self,
1016 called_user_id: u64,
1017 initial_project_id: Option<u64>,
1018 cx: &mut ModelContext<Self>,
1019 ) -> Task<Result<()>> {
1020 if self.status.is_offline() {
1021 return Task::ready(Err(anyhow!("room is offline")));
1022 }
1023
1024 cx.notify();
1025 let client = self.client.clone();
1026 let room_id = self.id;
1027 self.pending_call_count += 1;
1028 cx.spawn(|this, mut cx| async move {
1029 let result = client
1030 .request(proto::Call {
1031 room_id,
1032 called_user_id,
1033 initial_project_id,
1034 })
1035 .await;
1036 this.update(&mut cx, |this, cx| {
1037 this.pending_call_count -= 1;
1038 if this.should_leave() {
1039 this.leave(cx).detach_and_log_err(cx);
1040 }
1041 });
1042 result?;
1043 Ok(())
1044 })
1045 }
1046
1047 pub fn join_project(
1048 &mut self,
1049 id: u64,
1050 language_registry: Arc<LanguageRegistry>,
1051 fs: Arc<dyn Fs>,
1052 cx: &mut ModelContext<Self>,
1053 ) -> Task<Result<ModelHandle<Project>>> {
1054 let client = self.client.clone();
1055 let user_store = self.user_store.clone();
1056 cx.emit(Event::RemoteProjectJoined { project_id: id });
1057 cx.spawn(|this, mut cx| async move {
1058 let project =
1059 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1060
1061 this.update(&mut cx, |this, cx| {
1062 this.joined_projects.retain(|project| {
1063 if let Some(project) = project.upgrade(cx) {
1064 !project.read(cx).is_read_only()
1065 } else {
1066 false
1067 }
1068 });
1069 this.joined_projects.insert(project.downgrade());
1070 });
1071 Ok(project)
1072 })
1073 }
1074
1075 pub(crate) fn share_project(
1076 &mut self,
1077 project: ModelHandle<Project>,
1078 cx: &mut ModelContext<Self>,
1079 ) -> Task<Result<u64>> {
1080 if let Some(project_id) = project.read(cx).remote_id() {
1081 return Task::ready(Ok(project_id));
1082 }
1083
1084 let request = self.client.request(proto::ShareProject {
1085 room_id: self.id(),
1086 worktrees: project.read(cx).worktree_metadata_protos(cx),
1087 });
1088 cx.spawn(|this, mut cx| async move {
1089 let response = request.await?;
1090
1091 project.update(&mut cx, |project, cx| {
1092 project.shared(response.project_id, cx)
1093 })?;
1094
1095 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1096 this.update(&mut cx, |this, cx| {
1097 this.shared_projects.insert(project.downgrade());
1098 let active_project = this.local_participant.active_project.as_ref();
1099 if active_project.map_or(false, |location| *location == project) {
1100 this.set_location(Some(&project), cx)
1101 } else {
1102 Task::ready(Ok(()))
1103 }
1104 })
1105 .await?;
1106
1107 Ok(response.project_id)
1108 })
1109 }
1110
1111 pub(crate) fn unshare_project(
1112 &mut self,
1113 project: ModelHandle<Project>,
1114 cx: &mut ModelContext<Self>,
1115 ) -> Result<()> {
1116 let project_id = match project.read(cx).remote_id() {
1117 Some(project_id) => project_id,
1118 None => return Ok(()),
1119 };
1120
1121 self.client.send(proto::UnshareProject { project_id })?;
1122 project.update(cx, |this, cx| this.unshare(cx))
1123 }
1124
1125 pub(crate) fn set_location(
1126 &mut self,
1127 project: Option<&ModelHandle<Project>>,
1128 cx: &mut ModelContext<Self>,
1129 ) -> Task<Result<()>> {
1130 if self.status.is_offline() {
1131 return Task::ready(Err(anyhow!("room is offline")));
1132 }
1133
1134 let client = self.client.clone();
1135 let room_id = self.id;
1136 let location = if let Some(project) = project {
1137 self.local_participant.active_project = Some(project.downgrade());
1138 if let Some(project_id) = project.read(cx).remote_id() {
1139 proto::participant_location::Variant::SharedProject(
1140 proto::participant_location::SharedProject { id: project_id },
1141 )
1142 } else {
1143 proto::participant_location::Variant::UnsharedProject(
1144 proto::participant_location::UnsharedProject {},
1145 )
1146 }
1147 } else {
1148 self.local_participant.active_project = None;
1149 proto::participant_location::Variant::External(proto::participant_location::External {})
1150 };
1151
1152 cx.notify();
1153 cx.foreground().spawn(async move {
1154 client
1155 .request(proto::UpdateParticipantLocation {
1156 room_id,
1157 location: Some(proto::ParticipantLocation {
1158 variant: Some(location),
1159 }),
1160 })
1161 .await?;
1162 Ok(())
1163 })
1164 }
1165
1166 pub fn is_screen_sharing(&self) -> bool {
1167 self.live_kit.as_ref().map_or(false, |live_kit| {
1168 !matches!(live_kit.screen_track, LocalTrack::None)
1169 })
1170 }
1171
1172 pub fn is_sharing_mic(&self) -> bool {
1173 self.live_kit.as_ref().map_or(false, |live_kit| {
1174 !matches!(live_kit.microphone_track, LocalTrack::None)
1175 })
1176 }
1177
1178 pub fn is_muted(&self, cx: &AppContext) -> bool {
1179 self.live_kit
1180 .as_ref()
1181 .and_then(|live_kit| match &live_kit.microphone_track {
1182 LocalTrack::None => Some(Self::mute_on_join(cx)),
1183 LocalTrack::Pending { muted, .. } => Some(*muted),
1184 LocalTrack::Published { muted, .. } => Some(*muted),
1185 })
1186 .unwrap_or(false)
1187 }
1188
1189 pub fn is_speaking(&self) -> bool {
1190 self.live_kit
1191 .as_ref()
1192 .map_or(false, |live_kit| live_kit.speaking)
1193 }
1194
1195 pub fn is_deafened(&self) -> Option<bool> {
1196 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1197 }
1198
1199 #[track_caller]
1200 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1201 if self.status.is_offline() {
1202 return Task::ready(Err(anyhow!("room is offline")));
1203 } else if self.is_sharing_mic() {
1204 return Task::ready(Err(anyhow!("microphone was already shared")));
1205 }
1206
1207 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1208 let publish_id = post_inc(&mut live_kit.next_publish_id);
1209 live_kit.microphone_track = LocalTrack::Pending {
1210 publish_id,
1211 muted: false,
1212 };
1213 cx.notify();
1214 publish_id
1215 } else {
1216 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1217 };
1218
1219 cx.spawn_weak(|this, mut cx| async move {
1220 let publish_track = async {
1221 let track = LocalAudioTrack::create();
1222 this.upgrade(&cx)
1223 .ok_or_else(|| anyhow!("room was dropped"))?
1224 .read_with(&cx, |this, _| {
1225 this.live_kit
1226 .as_ref()
1227 .map(|live_kit| live_kit.room.publish_audio_track(&track))
1228 })
1229 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1230 .await
1231 };
1232
1233 let publication = publish_track.await;
1234 this.upgrade(&cx)
1235 .ok_or_else(|| anyhow!("room was dropped"))?
1236 .update(&mut cx, |this, cx| {
1237 let live_kit = this
1238 .live_kit
1239 .as_mut()
1240 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1241
1242 let (canceled, muted) = if let LocalTrack::Pending {
1243 publish_id: cur_publish_id,
1244 muted,
1245 } = &live_kit.microphone_track
1246 {
1247 (*cur_publish_id != publish_id, *muted)
1248 } else {
1249 (true, false)
1250 };
1251
1252 match publication {
1253 Ok(publication) => {
1254 if canceled {
1255 live_kit.room.unpublish_track(publication);
1256 } else {
1257 if muted {
1258 cx.background().spawn(publication.set_mute(muted)).detach();
1259 }
1260 live_kit.microphone_track = LocalTrack::Published {
1261 track_publication: publication,
1262 muted,
1263 };
1264 cx.notify();
1265 }
1266 Ok(())
1267 }
1268 Err(error) => {
1269 if canceled {
1270 Ok(())
1271 } else {
1272 live_kit.microphone_track = LocalTrack::None;
1273 cx.notify();
1274 Err(error)
1275 }
1276 }
1277 }
1278 })
1279 })
1280 }
1281
1282 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1283 if self.status.is_offline() {
1284 return Task::ready(Err(anyhow!("room is offline")));
1285 } else if self.is_screen_sharing() {
1286 return Task::ready(Err(anyhow!("screen was already shared")));
1287 }
1288
1289 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1290 let publish_id = post_inc(&mut live_kit.next_publish_id);
1291 live_kit.screen_track = LocalTrack::Pending {
1292 publish_id,
1293 muted: false,
1294 };
1295 cx.notify();
1296 (live_kit.room.display_sources(), publish_id)
1297 } else {
1298 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1299 };
1300
1301 cx.spawn_weak(|this, mut cx| async move {
1302 let publish_track = async {
1303 let displays = displays.await?;
1304 let display = displays
1305 .first()
1306 .ok_or_else(|| anyhow!("no display found"))?;
1307 let track = LocalVideoTrack::screen_share_for_display(&display);
1308 this.upgrade(&cx)
1309 .ok_or_else(|| anyhow!("room was dropped"))?
1310 .read_with(&cx, |this, _| {
1311 this.live_kit
1312 .as_ref()
1313 .map(|live_kit| live_kit.room.publish_video_track(&track))
1314 })
1315 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1316 .await
1317 };
1318
1319 let publication = publish_track.await;
1320 this.upgrade(&cx)
1321 .ok_or_else(|| anyhow!("room was dropped"))?
1322 .update(&mut cx, |this, cx| {
1323 let live_kit = this
1324 .live_kit
1325 .as_mut()
1326 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1327
1328 let (canceled, muted) = if let LocalTrack::Pending {
1329 publish_id: cur_publish_id,
1330 muted,
1331 } = &live_kit.screen_track
1332 {
1333 (*cur_publish_id != publish_id, *muted)
1334 } else {
1335 (true, false)
1336 };
1337
1338 match publication {
1339 Ok(publication) => {
1340 if canceled {
1341 live_kit.room.unpublish_track(publication);
1342 } else {
1343 if muted {
1344 cx.background().spawn(publication.set_mute(muted)).detach();
1345 }
1346 live_kit.screen_track = LocalTrack::Published {
1347 track_publication: publication,
1348 muted,
1349 };
1350 cx.notify();
1351 }
1352
1353 Audio::play_sound(Sound::StartScreenshare, cx);
1354
1355 Ok(())
1356 }
1357 Err(error) => {
1358 if canceled {
1359 Ok(())
1360 } else {
1361 live_kit.screen_track = LocalTrack::None;
1362 cx.notify();
1363 Err(error)
1364 }
1365 }
1366 }
1367 })
1368 })
1369 }
1370
1371 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1372 let should_mute = !self.is_muted(cx);
1373 if let Some(live_kit) = self.live_kit.as_mut() {
1374 if matches!(live_kit.microphone_track, LocalTrack::None) {
1375 return Ok(self.share_microphone(cx));
1376 }
1377
1378 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1379 live_kit.muted_by_user = should_mute;
1380
1381 if old_muted == true && live_kit.deafened == true {
1382 if let Some(task) = self.toggle_deafen(cx).ok() {
1383 task.detach();
1384 }
1385 }
1386
1387 Ok(ret_task)
1388 } else {
1389 Err(anyhow!("LiveKit not started"))
1390 }
1391 }
1392
1393 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1394 if let Some(live_kit) = self.live_kit.as_mut() {
1395 (*live_kit).deafened = !live_kit.deafened;
1396
1397 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1398 // Context notification is sent within set_mute itself.
1399 let mut mute_task = None;
1400 // When deafening, mute user's mic as well.
1401 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1402 if live_kit.deafened || !live_kit.muted_by_user {
1403 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1404 };
1405 for participant in self.remote_participants.values() {
1406 for track in live_kit
1407 .room
1408 .remote_audio_track_publications(&participant.user.id.to_string())
1409 {
1410 tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1411 }
1412 }
1413
1414 Ok(cx.foreground().spawn(async move {
1415 if let Some(mute_task) = mute_task {
1416 mute_task.await?;
1417 }
1418 for task in tasks {
1419 task.await?;
1420 }
1421 Ok(())
1422 }))
1423 } else {
1424 Err(anyhow!("LiveKit not started"))
1425 }
1426 }
1427
1428 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1429 if self.status.is_offline() {
1430 return Err(anyhow!("room is offline"));
1431 }
1432
1433 let live_kit = self
1434 .live_kit
1435 .as_mut()
1436 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1437 match mem::take(&mut live_kit.screen_track) {
1438 LocalTrack::None => Err(anyhow!("screen was not shared")),
1439 LocalTrack::Pending { .. } => {
1440 cx.notify();
1441 Ok(())
1442 }
1443 LocalTrack::Published {
1444 track_publication, ..
1445 } => {
1446 live_kit.room.unpublish_track(track_publication);
1447 cx.notify();
1448
1449 Audio::play_sound(Sound::StopScreenshare, cx);
1450 Ok(())
1451 }
1452 }
1453 }
1454
1455 #[cfg(any(test, feature = "test-support"))]
1456 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1457 self.live_kit
1458 .as_ref()
1459 .unwrap()
1460 .room
1461 .set_display_sources(sources);
1462 }
1463}
1464
1465struct LiveKitRoom {
1466 room: Arc<live_kit_client::Room>,
1467 screen_track: LocalTrack,
1468 microphone_track: LocalTrack,
1469 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1470 muted_by_user: bool,
1471 deafened: bool,
1472 speaking: bool,
1473 next_publish_id: usize,
1474 _maintain_room: Task<()>,
1475 _maintain_tracks: [Task<()>; 2],
1476}
1477
1478impl LiveKitRoom {
1479 fn set_mute(
1480 self: &mut LiveKitRoom,
1481 should_mute: bool,
1482 cx: &mut ModelContext<Room>,
1483 ) -> Result<(Task<Result<()>>, bool)> {
1484 if !should_mute {
1485 // clear user muting state.
1486 self.muted_by_user = false;
1487 }
1488
1489 let (result, old_muted) = match &mut self.microphone_track {
1490 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1491 LocalTrack::Pending { muted, .. } => {
1492 let old_muted = *muted;
1493 *muted = should_mute;
1494 cx.notify();
1495 Ok((Task::Ready(Some(Ok(()))), old_muted))
1496 }
1497 LocalTrack::Published {
1498 track_publication,
1499 muted,
1500 } => {
1501 let old_muted = *muted;
1502 *muted = should_mute;
1503 cx.notify();
1504 Ok((
1505 cx.background().spawn(track_publication.set_mute(*muted)),
1506 old_muted,
1507 ))
1508 }
1509 }?;
1510
1511 if old_muted != should_mute {
1512 if should_mute {
1513 Audio::play_sound(Sound::Mute, cx);
1514 } else {
1515 Audio::play_sound(Sound::Unmute, cx);
1516 }
1517 }
1518
1519 Ok((result, old_muted))
1520 }
1521}
1522
1523enum LocalTrack {
1524 None,
1525 Pending {
1526 publish_id: usize,
1527 muted: bool,
1528 },
1529 Published {
1530 track_publication: LocalTrackPublication,
1531 muted: bool,
1532 },
1533}
1534
1535impl Default for LocalTrack {
1536 fn default() -> Self {
1537 Self::None
1538 }
1539}
1540
1541#[derive(Copy, Clone, PartialEq, Eq)]
1542pub enum RoomStatus {
1543 Online,
1544 Rejoining,
1545 Offline,
1546}
1547
1548impl RoomStatus {
1549 pub fn is_offline(&self) -> bool {
1550 matches!(self, RoomStatus::Offline)
1551 }
1552
1553 pub fn is_online(&self) -> bool {
1554 matches!(self, RoomStatus::Online)
1555 }
1556}