1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
4 IncomingCall,
5};
6use anyhow::{anyhow, Result};
7use audio::{Audio, Sound};
8use client::{
9 proto::{self, PeerId},
10 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
11};
12use collections::{BTreeMap, HashMap, HashSet};
13use fs::Fs;
14use futures::{FutureExt, StreamExt};
15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
16use language::LanguageRegistry;
17use live_kit_client::{
18 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
19 RemoteVideoTrackUpdate,
20};
21use postage::stream::Stream;
22use project::Project;
23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
24use util::{post_inc, ResultExt, TryFutureExt};
25
26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
27
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum Event {
30 ParticipantLocationChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteVideoTracksChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteAudioTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteProjectShared {
40 owner: Arc<User>,
41 project_id: u64,
42 worktree_root_names: Vec<String>,
43 },
44 RemoteProjectUnshared {
45 project_id: u64,
46 },
47 RemoteProjectJoined {
48 project_id: u64,
49 },
50 RemoteProjectInvitationDiscarded {
51 project_id: u64,
52 },
53 Left,
54}
55
56pub struct Room {
57 id: u64,
58 channel_id: Option<u64>,
59 live_kit: Option<LiveKitRoom>,
60 status: RoomStatus,
61 shared_projects: HashSet<WeakModelHandle<Project>>,
62 joined_projects: HashSet<WeakModelHandle<Project>>,
63 local_participant: LocalParticipant,
64 remote_participants: BTreeMap<u64, RemoteParticipant>,
65 pending_participants: Vec<Arc<User>>,
66 participant_user_ids: HashSet<u64>,
67 pending_call_count: usize,
68 leave_when_empty: bool,
69 client: Arc<Client>,
70 user_store: ModelHandle<UserStore>,
71 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
72 subscriptions: Vec<client::Subscription>,
73 pending_room_update: Option<Task<()>>,
74 maintain_connection: Option<Task<Option<()>>>,
75}
76
77impl Entity for Room {
78 type Event = Event;
79
80 fn release(&mut self, cx: &mut AppContext) {
81 if self.status.is_online() {
82 self.leave_internal(cx).detach_and_log_err(cx);
83 }
84 }
85
86 fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
87 if self.status.is_online() {
88 let leave = self.leave_internal(cx);
89 Some(
90 cx.background()
91 .spawn(async move {
92 leave.await.log_err();
93 })
94 .boxed(),
95 )
96 } else {
97 None
98 }
99 }
100}
101
102impl Room {
103 pub fn channel_id(&self) -> Option<u64> {
104 self.channel_id
105 }
106
107 #[cfg(any(test, feature = "test-support"))]
108 pub fn is_connected(&self) -> bool {
109 if let Some(live_kit) = self.live_kit.as_ref() {
110 matches!(
111 *live_kit.room.status().borrow(),
112 live_kit_client::ConnectionState::Connected { .. }
113 )
114 } else {
115 false
116 }
117 }
118
119 fn new(
120 id: u64,
121 channel_id: Option<u64>,
122 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
123 client: Arc<Client>,
124 user_store: ModelHandle<UserStore>,
125 cx: &mut ModelContext<Self>,
126 ) -> Self {
127 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
128 let room = live_kit_client::Room::new();
129 let mut status = room.status();
130 // Consume the initial status of the room.
131 let _ = status.try_recv();
132 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
133 while let Some(status) = status.next().await {
134 let this = if let Some(this) = this.upgrade(&cx) {
135 this
136 } else {
137 break;
138 };
139
140 if status == live_kit_client::ConnectionState::Disconnected {
141 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
142 break;
143 }
144 }
145 });
146
147 let mut track_video_changes = room.remote_video_track_updates();
148 let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
149 while let Some(track_change) = track_video_changes.next().await {
150 let this = if let Some(this) = this.upgrade(&cx) {
151 this
152 } else {
153 break;
154 };
155
156 this.update(&mut cx, |this, cx| {
157 this.remote_video_track_updated(track_change, cx).log_err()
158 });
159 }
160 });
161
162 let mut track_audio_changes = room.remote_audio_track_updates();
163 let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
164 while let Some(track_change) = track_audio_changes.next().await {
165 let this = if let Some(this) = this.upgrade(&cx) {
166 this
167 } else {
168 break;
169 };
170
171 this.update(&mut cx, |this, cx| {
172 this.remote_audio_track_updated(track_change, cx).log_err()
173 });
174 }
175 });
176
177 let connect = room.connect(&connection_info.server_url, &connection_info.token);
178 cx.spawn(|this, mut cx| async move {
179 connect.await?;
180
181 if !cx.read(Self::mute_on_join) {
182 this.update(&mut cx, |this, cx| this.share_microphone(cx))
183 .await?;
184 }
185
186 anyhow::Ok(())
187 })
188 .detach_and_log_err(cx);
189
190 Some(LiveKitRoom {
191 room,
192 screen_track: LocalTrack::None,
193 microphone_track: LocalTrack::None,
194 next_publish_id: 0,
195 muted_by_user: false,
196 deafened: false,
197 speaking: false,
198 _maintain_room,
199 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
200 })
201 } else {
202 None
203 };
204
205 let maintain_connection =
206 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
207
208 Audio::play_sound(Sound::Joined, cx);
209
210 Self {
211 id,
212 channel_id,
213 live_kit: live_kit_room,
214 status: RoomStatus::Online,
215 shared_projects: Default::default(),
216 joined_projects: Default::default(),
217 participant_user_ids: Default::default(),
218 local_participant: Default::default(),
219 remote_participants: Default::default(),
220 pending_participants: Default::default(),
221 pending_call_count: 0,
222 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
223 leave_when_empty: false,
224 pending_room_update: None,
225 client,
226 user_store,
227 follows_by_leader_id_project_id: Default::default(),
228 maintain_connection: Some(maintain_connection),
229 }
230 }
231
232 pub(crate) fn create(
233 called_user_id: u64,
234 initial_project: Option<ModelHandle<Project>>,
235 client: Arc<Client>,
236 user_store: ModelHandle<UserStore>,
237 cx: &mut AppContext,
238 ) -> Task<Result<ModelHandle<Self>>> {
239 cx.spawn(|mut cx| async move {
240 let response = client.request(proto::CreateRoom {}).await?;
241 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
242 let room = cx.add_model(|cx| {
243 Self::new(
244 room_proto.id,
245 None,
246 response.live_kit_connection_info,
247 client,
248 user_store,
249 cx,
250 )
251 });
252
253 let initial_project_id = if let Some(initial_project) = initial_project {
254 let initial_project_id = room
255 .update(&mut cx, |room, cx| {
256 room.share_project(initial_project.clone(), cx)
257 })
258 .await?;
259 Some(initial_project_id)
260 } else {
261 None
262 };
263
264 match room
265 .update(&mut cx, |room, cx| {
266 room.leave_when_empty = true;
267 room.call(called_user_id, initial_project_id, cx)
268 })
269 .await
270 {
271 Ok(()) => Ok(room),
272 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
273 }
274 })
275 }
276
277 pub(crate) fn join_channel(
278 channel_id: u64,
279 client: Arc<Client>,
280 user_store: ModelHandle<UserStore>,
281 cx: &mut AppContext,
282 ) -> Task<Result<ModelHandle<Self>>> {
283 cx.spawn(|cx| async move {
284 Self::from_join_response(
285 client.request(proto::JoinChannel { channel_id }).await?,
286 client,
287 user_store,
288 cx,
289 )
290 })
291 }
292
293 pub(crate) fn join(
294 call: &IncomingCall,
295 client: Arc<Client>,
296 user_store: ModelHandle<UserStore>,
297 cx: &mut AppContext,
298 ) -> Task<Result<ModelHandle<Self>>> {
299 let id = call.room_id;
300 cx.spawn(|cx| async move {
301 Self::from_join_response(
302 client.request(proto::JoinRoom { id }).await?,
303 client,
304 user_store,
305 cx,
306 )
307 })
308 }
309
310 pub fn mute_on_join(cx: &AppContext) -> bool {
311 settings::get::<CallSettings>(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
312 }
313
314 fn from_join_response(
315 response: proto::JoinRoomResponse,
316 client: Arc<Client>,
317 user_store: ModelHandle<UserStore>,
318 mut cx: AsyncAppContext,
319 ) -> Result<ModelHandle<Self>> {
320 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
321 let room = cx.add_model(|cx| {
322 Self::new(
323 room_proto.id,
324 response.channel_id,
325 response.live_kit_connection_info,
326 client,
327 user_store,
328 cx,
329 )
330 });
331 room.update(&mut cx, |room, cx| {
332 room.leave_when_empty = room.channel_id.is_none();
333 room.apply_room_update(room_proto, cx)?;
334 anyhow::Ok(())
335 })?;
336 Ok(room)
337 }
338
339 fn should_leave(&self) -> bool {
340 self.leave_when_empty
341 && self.pending_room_update.is_none()
342 && self.pending_participants.is_empty()
343 && self.remote_participants.is_empty()
344 && self.pending_call_count == 0
345 }
346
347 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
348 cx.notify();
349 cx.emit(Event::Left);
350 self.leave_internal(cx)
351 }
352
353 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
354 if self.status.is_offline() {
355 return Task::ready(Err(anyhow!("room is offline")));
356 }
357
358 log::info!("leaving room");
359 Audio::play_sound(Sound::Leave, cx);
360
361 self.clear_state(cx);
362
363 let leave_room = self.client.request(proto::LeaveRoom {});
364 cx.background().spawn(async move {
365 leave_room.await?;
366 anyhow::Ok(())
367 })
368 }
369
370 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
371 for project in self.shared_projects.drain() {
372 if let Some(project) = project.upgrade(cx) {
373 project.update(cx, |project, cx| {
374 project.unshare(cx).log_err();
375 });
376 }
377 }
378 for project in self.joined_projects.drain() {
379 if let Some(project) = project.upgrade(cx) {
380 project.update(cx, |project, cx| {
381 project.disconnected_from_host(cx);
382 project.close(cx);
383 });
384 }
385 }
386
387 self.status = RoomStatus::Offline;
388 self.remote_participants.clear();
389 self.pending_participants.clear();
390 self.participant_user_ids.clear();
391 self.subscriptions.clear();
392 self.live_kit.take();
393 self.pending_room_update.take();
394 self.maintain_connection.take();
395 }
396
397 async fn maintain_connection(
398 this: WeakModelHandle<Self>,
399 client: Arc<Client>,
400 mut cx: AsyncAppContext,
401 ) -> Result<()> {
402 let mut client_status = client.status();
403 loop {
404 let _ = client_status.try_recv();
405 let is_connected = client_status.borrow().is_connected();
406 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
407 if !is_connected || client_status.next().await.is_some() {
408 log::info!("detected client disconnection");
409
410 this.upgrade(&cx)
411 .ok_or_else(|| anyhow!("room was dropped"))?
412 .update(&mut cx, |this, cx| {
413 this.status = RoomStatus::Rejoining;
414 cx.notify();
415 });
416
417 // Wait for client to re-establish a connection to the server.
418 {
419 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
420 let client_reconnection = async {
421 let mut remaining_attempts = 3;
422 while remaining_attempts > 0 {
423 if client_status.borrow().is_connected() {
424 log::info!("client reconnected, attempting to rejoin room");
425
426 let Some(this) = this.upgrade(&cx) else { break };
427 if this
428 .update(&mut cx, |this, cx| this.rejoin(cx))
429 .await
430 .log_err()
431 .is_some()
432 {
433 return true;
434 } else {
435 remaining_attempts -= 1;
436 }
437 } else if client_status.borrow().is_signed_out() {
438 return false;
439 }
440
441 log::info!(
442 "waiting for client status change, remaining attempts {}",
443 remaining_attempts
444 );
445 client_status.next().await;
446 }
447 false
448 }
449 .fuse();
450 futures::pin_mut!(client_reconnection);
451
452 futures::select_biased! {
453 reconnected = client_reconnection => {
454 if reconnected {
455 log::info!("successfully reconnected to room");
456 // If we successfully joined the room, go back around the loop
457 // waiting for future connection status changes.
458 continue;
459 }
460 }
461 _ = reconnection_timeout => {
462 log::info!("room reconnection timeout expired");
463 }
464 }
465 }
466
467 break;
468 }
469 }
470
471 // The client failed to re-establish a connection to the server
472 // or an error occurred while trying to re-join the room. Either way
473 // we leave the room and return an error.
474 if let Some(this) = this.upgrade(&cx) {
475 log::info!("reconnection failed, leaving room");
476 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
477 }
478 Err(anyhow!(
479 "can't reconnect to room: client failed to re-establish connection"
480 ))
481 }
482
483 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
484 let mut projects = HashMap::default();
485 let mut reshared_projects = Vec::new();
486 let mut rejoined_projects = Vec::new();
487 self.shared_projects.retain(|project| {
488 if let Some(handle) = project.upgrade(cx) {
489 let project = handle.read(cx);
490 if let Some(project_id) = project.remote_id() {
491 projects.insert(project_id, handle.clone());
492 reshared_projects.push(proto::UpdateProject {
493 project_id,
494 worktrees: project.worktree_metadata_protos(cx),
495 });
496 return true;
497 }
498 }
499 false
500 });
501 self.joined_projects.retain(|project| {
502 if let Some(handle) = project.upgrade(cx) {
503 let project = handle.read(cx);
504 if let Some(project_id) = project.remote_id() {
505 projects.insert(project_id, handle.clone());
506 rejoined_projects.push(proto::RejoinProject {
507 id: project_id,
508 worktrees: project
509 .worktrees(cx)
510 .map(|worktree| {
511 let worktree = worktree.read(cx);
512 proto::RejoinWorktree {
513 id: worktree.id().to_proto(),
514 scan_id: worktree.completed_scan_id() as u64,
515 }
516 })
517 .collect(),
518 });
519 }
520 return true;
521 }
522 false
523 });
524
525 let response = self.client.request_envelope(proto::RejoinRoom {
526 id: self.id,
527 reshared_projects,
528 rejoined_projects,
529 });
530
531 cx.spawn(|this, mut cx| async move {
532 let response = response.await?;
533 let message_id = response.message_id;
534 let response = response.payload;
535 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
536 this.update(&mut cx, |this, cx| {
537 this.status = RoomStatus::Online;
538 this.apply_room_update(room_proto, cx)?;
539
540 for reshared_project in response.reshared_projects {
541 if let Some(project) = projects.get(&reshared_project.id) {
542 project.update(cx, |project, cx| {
543 project.reshared(reshared_project, cx).log_err();
544 });
545 }
546 }
547
548 for rejoined_project in response.rejoined_projects {
549 if let Some(project) = projects.get(&rejoined_project.id) {
550 project.update(cx, |project, cx| {
551 project.rejoined(rejoined_project, message_id, cx).log_err();
552 });
553 }
554 }
555
556 anyhow::Ok(())
557 })
558 })
559 }
560
561 pub fn id(&self) -> u64 {
562 self.id
563 }
564
565 pub fn status(&self) -> RoomStatus {
566 self.status
567 }
568
569 pub fn local_participant(&self) -> &LocalParticipant {
570 &self.local_participant
571 }
572
573 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
574 &self.remote_participants
575 }
576
577 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
578 self.remote_participants
579 .values()
580 .find(|p| p.peer_id == peer_id)
581 }
582
583 pub fn pending_participants(&self) -> &[Arc<User>] {
584 &self.pending_participants
585 }
586
587 pub fn contains_participant(&self, user_id: u64) -> bool {
588 self.participant_user_ids.contains(&user_id)
589 }
590
591 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
592 self.follows_by_leader_id_project_id
593 .get(&(leader_id, project_id))
594 .map_or(&[], |v| v.as_slice())
595 }
596
597 /// projects_to_join returns a list of shared projects sorted such
598 /// that the most 'active' projects appear last.
599 pub fn projects_to_join(&self) -> Vec<(u64, u64)> {
600 let mut projects = HashMap::default();
601 let mut hosts = HashMap::default();
602 for participant in self.remote_participants.values() {
603 match participant.location {
604 ParticipantLocation::SharedProject { project_id } => {
605 *projects.entry(project_id).or_insert(0) += 1;
606 }
607 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
608 }
609 for project in &participant.projects {
610 *projects.entry(project.id).or_insert(0) += 1;
611 hosts.insert(project.id, participant.user.id);
612 }
613 }
614
615 let mut pairs: Vec<(u64, usize)> = projects.into_iter().collect();
616 pairs.sort_by_key(|(_, count)| 0 - *count as i32);
617
618 pairs
619 .into_iter()
620 .map(|(project_id, _)| (project_id, hosts[&project_id]))
621 .collect()
622 }
623
624 async fn handle_room_updated(
625 this: ModelHandle<Self>,
626 envelope: TypedEnvelope<proto::RoomUpdated>,
627 _: Arc<Client>,
628 mut cx: AsyncAppContext,
629 ) -> Result<()> {
630 let room = envelope
631 .payload
632 .room
633 .ok_or_else(|| anyhow!("invalid room"))?;
634 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
635 }
636
637 fn apply_room_update(
638 &mut self,
639 mut room: proto::Room,
640 cx: &mut ModelContext<Self>,
641 ) -> Result<()> {
642 // Filter ourselves out from the room's participants.
643 let local_participant_ix = room
644 .participants
645 .iter()
646 .position(|participant| Some(participant.user_id) == self.client.user_id());
647 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
648
649 let pending_participant_user_ids = room
650 .pending_participants
651 .iter()
652 .map(|p| p.user_id)
653 .collect::<Vec<_>>();
654
655 let remote_participant_user_ids = room
656 .participants
657 .iter()
658 .map(|p| p.user_id)
659 .collect::<Vec<_>>();
660
661 let (remote_participants, pending_participants) =
662 self.user_store.update(cx, move |user_store, cx| {
663 (
664 user_store.get_users(remote_participant_user_ids, cx),
665 user_store.get_users(pending_participant_user_ids, cx),
666 )
667 });
668
669 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
670 let (remote_participants, pending_participants) =
671 futures::join!(remote_participants, pending_participants);
672
673 this.update(&mut cx, |this, cx| {
674 this.participant_user_ids.clear();
675
676 if let Some(participant) = local_participant {
677 this.local_participant.projects = participant.projects;
678 } else {
679 this.local_participant.projects.clear();
680 }
681
682 if let Some(participants) = remote_participants.log_err() {
683 for (participant, user) in room.participants.into_iter().zip(participants) {
684 let Some(peer_id) = participant.peer_id else {
685 continue;
686 };
687 this.participant_user_ids.insert(participant.user_id);
688
689 let old_projects = this
690 .remote_participants
691 .get(&participant.user_id)
692 .into_iter()
693 .flat_map(|existing| &existing.projects)
694 .map(|project| project.id)
695 .collect::<HashSet<_>>();
696 let new_projects = participant
697 .projects
698 .iter()
699 .map(|project| project.id)
700 .collect::<HashSet<_>>();
701
702 for project in &participant.projects {
703 if !old_projects.contains(&project.id) {
704 cx.emit(Event::RemoteProjectShared {
705 owner: user.clone(),
706 project_id: project.id,
707 worktree_root_names: project.worktree_root_names.clone(),
708 });
709 }
710 }
711
712 for unshared_project_id in old_projects.difference(&new_projects) {
713 this.joined_projects.retain(|project| {
714 if let Some(project) = project.upgrade(cx) {
715 project.update(cx, |project, cx| {
716 if project.remote_id() == Some(*unshared_project_id) {
717 project.disconnected_from_host(cx);
718 false
719 } else {
720 true
721 }
722 })
723 } else {
724 false
725 }
726 });
727 cx.emit(Event::RemoteProjectUnshared {
728 project_id: *unshared_project_id,
729 });
730 }
731
732 let location = ParticipantLocation::from_proto(participant.location)
733 .unwrap_or(ParticipantLocation::External);
734 if let Some(remote_participant) =
735 this.remote_participants.get_mut(&participant.user_id)
736 {
737 remote_participant.projects = participant.projects;
738 remote_participant.peer_id = peer_id;
739 if location != remote_participant.location {
740 remote_participant.location = location;
741 cx.emit(Event::ParticipantLocationChanged {
742 participant_id: peer_id,
743 });
744 }
745 } else {
746 this.remote_participants.insert(
747 participant.user_id,
748 RemoteParticipant {
749 user: user.clone(),
750 participant_index: ParticipantIndex(
751 participant.participant_index,
752 ),
753 peer_id,
754 projects: participant.projects,
755 location,
756 muted: true,
757 speaking: false,
758 video_tracks: Default::default(),
759 audio_tracks: Default::default(),
760 },
761 );
762
763 Audio::play_sound(Sound::Joined, cx);
764
765 if let Some(live_kit) = this.live_kit.as_ref() {
766 let video_tracks =
767 live_kit.room.remote_video_tracks(&user.id.to_string());
768 let audio_tracks =
769 live_kit.room.remote_audio_tracks(&user.id.to_string());
770 let publications = live_kit
771 .room
772 .remote_audio_track_publications(&user.id.to_string());
773
774 for track in video_tracks {
775 this.remote_video_track_updated(
776 RemoteVideoTrackUpdate::Subscribed(track),
777 cx,
778 )
779 .log_err();
780 }
781
782 for (track, publication) in
783 audio_tracks.iter().zip(publications.iter())
784 {
785 this.remote_audio_track_updated(
786 RemoteAudioTrackUpdate::Subscribed(
787 track.clone(),
788 publication.clone(),
789 ),
790 cx,
791 )
792 .log_err();
793 }
794 }
795 }
796 }
797
798 this.remote_participants.retain(|user_id, participant| {
799 if this.participant_user_ids.contains(user_id) {
800 true
801 } else {
802 for project in &participant.projects {
803 cx.emit(Event::RemoteProjectUnshared {
804 project_id: project.id,
805 });
806 }
807 false
808 }
809 });
810 }
811
812 if let Some(pending_participants) = pending_participants.log_err() {
813 this.pending_participants = pending_participants;
814 for participant in &this.pending_participants {
815 this.participant_user_ids.insert(participant.id);
816 }
817 }
818
819 this.follows_by_leader_id_project_id.clear();
820 for follower in room.followers {
821 let project_id = follower.project_id;
822 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
823 (Some(leader), Some(follower)) => (leader, follower),
824
825 _ => {
826 log::error!("Follower message {follower:?} missing some state");
827 continue;
828 }
829 };
830
831 let list = this
832 .follows_by_leader_id_project_id
833 .entry((leader, project_id))
834 .or_insert(Vec::new());
835 if !list.contains(&follower) {
836 list.push(follower);
837 }
838 }
839
840 this.pending_room_update.take();
841 if this.should_leave() {
842 log::info!("room is empty, leaving");
843 let _ = this.leave(cx);
844 }
845
846 this.user_store.update(cx, |user_store, cx| {
847 let participant_indices_by_user_id = this
848 .remote_participants
849 .iter()
850 .map(|(user_id, participant)| (*user_id, participant.participant_index))
851 .collect();
852 user_store.set_participant_indices(participant_indices_by_user_id, cx);
853 });
854
855 this.check_invariants();
856 cx.notify();
857 });
858 }));
859
860 cx.notify();
861 Ok(())
862 }
863
864 fn remote_video_track_updated(
865 &mut self,
866 change: RemoteVideoTrackUpdate,
867 cx: &mut ModelContext<Self>,
868 ) -> Result<()> {
869 match change {
870 RemoteVideoTrackUpdate::Subscribed(track) => {
871 let user_id = track.publisher_id().parse()?;
872 let track_id = track.sid().to_string();
873 let participant = self
874 .remote_participants
875 .get_mut(&user_id)
876 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
877 participant.video_tracks.insert(
878 track_id.clone(),
879 Arc::new(RemoteVideoTrack {
880 live_kit_track: track,
881 }),
882 );
883 cx.emit(Event::RemoteVideoTracksChanged {
884 participant_id: participant.peer_id,
885 });
886 }
887 RemoteVideoTrackUpdate::Unsubscribed {
888 publisher_id,
889 track_id,
890 } => {
891 let user_id = publisher_id.parse()?;
892 let participant = self
893 .remote_participants
894 .get_mut(&user_id)
895 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
896 participant.video_tracks.remove(&track_id);
897 cx.emit(Event::RemoteVideoTracksChanged {
898 participant_id: participant.peer_id,
899 });
900 }
901 }
902
903 cx.notify();
904 Ok(())
905 }
906
907 fn remote_audio_track_updated(
908 &mut self,
909 change: RemoteAudioTrackUpdate,
910 cx: &mut ModelContext<Self>,
911 ) -> Result<()> {
912 match change {
913 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
914 let mut speaker_ids = speakers
915 .into_iter()
916 .filter_map(|speaker_sid| speaker_sid.parse().ok())
917 .collect::<Vec<u64>>();
918 speaker_ids.sort_unstable();
919 for (sid, participant) in &mut self.remote_participants {
920 if let Ok(_) = speaker_ids.binary_search(sid) {
921 participant.speaking = true;
922 } else {
923 participant.speaking = false;
924 }
925 }
926 if let Some(id) = self.client.user_id() {
927 if let Some(room) = &mut self.live_kit {
928 if let Ok(_) = speaker_ids.binary_search(&id) {
929 room.speaking = true;
930 } else {
931 room.speaking = false;
932 }
933 }
934 }
935 cx.notify();
936 }
937 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
938 let mut found = false;
939 for participant in &mut self.remote_participants.values_mut() {
940 for track in participant.audio_tracks.values() {
941 if track.sid() == track_id {
942 found = true;
943 break;
944 }
945 }
946 if found {
947 participant.muted = muted;
948 break;
949 }
950 }
951
952 cx.notify();
953 }
954 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
955 let user_id = track.publisher_id().parse()?;
956 let track_id = track.sid().to_string();
957 let participant = self
958 .remote_participants
959 .get_mut(&user_id)
960 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
961
962 participant.audio_tracks.insert(track_id.clone(), track);
963 participant.muted = publication.is_muted();
964
965 cx.emit(Event::RemoteAudioTracksChanged {
966 participant_id: participant.peer_id,
967 });
968 }
969 RemoteAudioTrackUpdate::Unsubscribed {
970 publisher_id,
971 track_id,
972 } => {
973 let user_id = publisher_id.parse()?;
974 let participant = self
975 .remote_participants
976 .get_mut(&user_id)
977 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
978 participant.audio_tracks.remove(&track_id);
979 cx.emit(Event::RemoteAudioTracksChanged {
980 participant_id: participant.peer_id,
981 });
982 }
983 }
984
985 cx.notify();
986 Ok(())
987 }
988
989 fn check_invariants(&self) {
990 #[cfg(any(test, feature = "test-support"))]
991 {
992 for participant in self.remote_participants.values() {
993 assert!(self.participant_user_ids.contains(&participant.user.id));
994 assert_ne!(participant.user.id, self.client.user_id().unwrap());
995 }
996
997 for participant in &self.pending_participants {
998 assert!(self.participant_user_ids.contains(&participant.id));
999 assert_ne!(participant.id, self.client.user_id().unwrap());
1000 }
1001
1002 assert_eq!(
1003 self.participant_user_ids.len(),
1004 self.remote_participants.len() + self.pending_participants.len()
1005 );
1006 }
1007 }
1008
1009 pub(crate) fn call(
1010 &mut self,
1011 called_user_id: u64,
1012 initial_project_id: Option<u64>,
1013 cx: &mut ModelContext<Self>,
1014 ) -> Task<Result<()>> {
1015 if self.status.is_offline() {
1016 return Task::ready(Err(anyhow!("room is offline")));
1017 }
1018
1019 cx.notify();
1020 let client = self.client.clone();
1021 let room_id = self.id;
1022 self.pending_call_count += 1;
1023 cx.spawn(|this, mut cx| async move {
1024 let result = client
1025 .request(proto::Call {
1026 room_id,
1027 called_user_id,
1028 initial_project_id,
1029 })
1030 .await;
1031 this.update(&mut cx, |this, cx| {
1032 this.pending_call_count -= 1;
1033 if this.should_leave() {
1034 this.leave(cx).detach_and_log_err(cx);
1035 }
1036 });
1037 result?;
1038 Ok(())
1039 })
1040 }
1041
1042 pub fn join_project(
1043 &mut self,
1044 id: u64,
1045 language_registry: Arc<LanguageRegistry>,
1046 fs: Arc<dyn Fs>,
1047 cx: &mut ModelContext<Self>,
1048 ) -> Task<Result<ModelHandle<Project>>> {
1049 let client = self.client.clone();
1050 let user_store = self.user_store.clone();
1051 cx.emit(Event::RemoteProjectJoined { project_id: id });
1052 cx.spawn(|this, mut cx| async move {
1053 let project =
1054 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1055
1056 this.update(&mut cx, |this, cx| {
1057 this.joined_projects.retain(|project| {
1058 if let Some(project) = project.upgrade(cx) {
1059 !project.read(cx).is_read_only()
1060 } else {
1061 false
1062 }
1063 });
1064 this.joined_projects.insert(project.downgrade());
1065 });
1066 Ok(project)
1067 })
1068 }
1069
1070 pub(crate) fn share_project(
1071 &mut self,
1072 project: ModelHandle<Project>,
1073 cx: &mut ModelContext<Self>,
1074 ) -> Task<Result<u64>> {
1075 if let Some(project_id) = project.read(cx).remote_id() {
1076 return Task::ready(Ok(project_id));
1077 }
1078
1079 let request = self.client.request(proto::ShareProject {
1080 room_id: self.id(),
1081 worktrees: project.read(cx).worktree_metadata_protos(cx),
1082 });
1083 cx.spawn(|this, mut cx| async move {
1084 let response = request.await?;
1085
1086 project.update(&mut cx, |project, cx| {
1087 project.shared(response.project_id, cx)
1088 })?;
1089
1090 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1091 this.update(&mut cx, |this, cx| {
1092 this.shared_projects.insert(project.downgrade());
1093 let active_project = this.local_participant.active_project.as_ref();
1094 if active_project.map_or(false, |location| *location == project) {
1095 this.set_location(Some(&project), cx)
1096 } else {
1097 Task::ready(Ok(()))
1098 }
1099 })
1100 .await?;
1101
1102 Ok(response.project_id)
1103 })
1104 }
1105
1106 pub(crate) fn unshare_project(
1107 &mut self,
1108 project: ModelHandle<Project>,
1109 cx: &mut ModelContext<Self>,
1110 ) -> Result<()> {
1111 let project_id = match project.read(cx).remote_id() {
1112 Some(project_id) => project_id,
1113 None => return Ok(()),
1114 };
1115
1116 self.client.send(proto::UnshareProject { project_id })?;
1117 project.update(cx, |this, cx| this.unshare(cx))
1118 }
1119
1120 pub(crate) fn set_location(
1121 &mut self,
1122 project: Option<&ModelHandle<Project>>,
1123 cx: &mut ModelContext<Self>,
1124 ) -> Task<Result<()>> {
1125 if self.status.is_offline() {
1126 return Task::ready(Err(anyhow!("room is offline")));
1127 }
1128
1129 let client = self.client.clone();
1130 let room_id = self.id;
1131 let location = if let Some(project) = project {
1132 self.local_participant.active_project = Some(project.downgrade());
1133 if let Some(project_id) = project.read(cx).remote_id() {
1134 proto::participant_location::Variant::SharedProject(
1135 proto::participant_location::SharedProject { id: project_id },
1136 )
1137 } else {
1138 proto::participant_location::Variant::UnsharedProject(
1139 proto::participant_location::UnsharedProject {},
1140 )
1141 }
1142 } else {
1143 self.local_participant.active_project = None;
1144 proto::participant_location::Variant::External(proto::participant_location::External {})
1145 };
1146
1147 cx.notify();
1148 cx.foreground().spawn(async move {
1149 client
1150 .request(proto::UpdateParticipantLocation {
1151 room_id,
1152 location: Some(proto::ParticipantLocation {
1153 variant: Some(location),
1154 }),
1155 })
1156 .await?;
1157 Ok(())
1158 })
1159 }
1160
1161 pub fn is_screen_sharing(&self) -> bool {
1162 self.live_kit.as_ref().map_or(false, |live_kit| {
1163 !matches!(live_kit.screen_track, LocalTrack::None)
1164 })
1165 }
1166
1167 pub fn is_sharing_mic(&self) -> bool {
1168 self.live_kit.as_ref().map_or(false, |live_kit| {
1169 !matches!(live_kit.microphone_track, LocalTrack::None)
1170 })
1171 }
1172
1173 pub fn is_muted(&self, cx: &AppContext) -> bool {
1174 self.live_kit
1175 .as_ref()
1176 .and_then(|live_kit| match &live_kit.microphone_track {
1177 LocalTrack::None => Some(Self::mute_on_join(cx)),
1178 LocalTrack::Pending { muted, .. } => Some(*muted),
1179 LocalTrack::Published { muted, .. } => Some(*muted),
1180 })
1181 .unwrap_or(false)
1182 }
1183
1184 pub fn is_speaking(&self) -> bool {
1185 self.live_kit
1186 .as_ref()
1187 .map_or(false, |live_kit| live_kit.speaking)
1188 }
1189
1190 pub fn is_deafened(&self) -> Option<bool> {
1191 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1192 }
1193
1194 #[track_caller]
1195 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1196 if self.status.is_offline() {
1197 return Task::ready(Err(anyhow!("room is offline")));
1198 } else if self.is_sharing_mic() {
1199 return Task::ready(Err(anyhow!("microphone was already shared")));
1200 }
1201
1202 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1203 let publish_id = post_inc(&mut live_kit.next_publish_id);
1204 live_kit.microphone_track = LocalTrack::Pending {
1205 publish_id,
1206 muted: false,
1207 };
1208 cx.notify();
1209 publish_id
1210 } else {
1211 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1212 };
1213
1214 cx.spawn_weak(|this, mut cx| async move {
1215 let publish_track = async {
1216 let track = LocalAudioTrack::create();
1217 this.upgrade(&cx)
1218 .ok_or_else(|| anyhow!("room was dropped"))?
1219 .read_with(&cx, |this, _| {
1220 this.live_kit
1221 .as_ref()
1222 .map(|live_kit| live_kit.room.publish_audio_track(&track))
1223 })
1224 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1225 .await
1226 };
1227
1228 let publication = publish_track.await;
1229 this.upgrade(&cx)
1230 .ok_or_else(|| anyhow!("room was dropped"))?
1231 .update(&mut cx, |this, cx| {
1232 let live_kit = this
1233 .live_kit
1234 .as_mut()
1235 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1236
1237 let (canceled, muted) = if let LocalTrack::Pending {
1238 publish_id: cur_publish_id,
1239 muted,
1240 } = &live_kit.microphone_track
1241 {
1242 (*cur_publish_id != publish_id, *muted)
1243 } else {
1244 (true, false)
1245 };
1246
1247 match publication {
1248 Ok(publication) => {
1249 if canceled {
1250 live_kit.room.unpublish_track(publication);
1251 } else {
1252 if muted {
1253 cx.background().spawn(publication.set_mute(muted)).detach();
1254 }
1255 live_kit.microphone_track = LocalTrack::Published {
1256 track_publication: publication,
1257 muted,
1258 };
1259 cx.notify();
1260 }
1261 Ok(())
1262 }
1263 Err(error) => {
1264 if canceled {
1265 Ok(())
1266 } else {
1267 live_kit.microphone_track = LocalTrack::None;
1268 cx.notify();
1269 Err(error)
1270 }
1271 }
1272 }
1273 })
1274 })
1275 }
1276
1277 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1278 if self.status.is_offline() {
1279 return Task::ready(Err(anyhow!("room is offline")));
1280 } else if self.is_screen_sharing() {
1281 return Task::ready(Err(anyhow!("screen was already shared")));
1282 }
1283
1284 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1285 let publish_id = post_inc(&mut live_kit.next_publish_id);
1286 live_kit.screen_track = LocalTrack::Pending {
1287 publish_id,
1288 muted: false,
1289 };
1290 cx.notify();
1291 (live_kit.room.display_sources(), publish_id)
1292 } else {
1293 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1294 };
1295
1296 cx.spawn_weak(|this, mut cx| async move {
1297 let publish_track = async {
1298 let displays = displays.await?;
1299 let display = displays
1300 .first()
1301 .ok_or_else(|| anyhow!("no display found"))?;
1302 let track = LocalVideoTrack::screen_share_for_display(&display);
1303 this.upgrade(&cx)
1304 .ok_or_else(|| anyhow!("room was dropped"))?
1305 .read_with(&cx, |this, _| {
1306 this.live_kit
1307 .as_ref()
1308 .map(|live_kit| live_kit.room.publish_video_track(&track))
1309 })
1310 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1311 .await
1312 };
1313
1314 let publication = publish_track.await;
1315 this.upgrade(&cx)
1316 .ok_or_else(|| anyhow!("room was dropped"))?
1317 .update(&mut cx, |this, cx| {
1318 let live_kit = this
1319 .live_kit
1320 .as_mut()
1321 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1322
1323 let (canceled, muted) = if let LocalTrack::Pending {
1324 publish_id: cur_publish_id,
1325 muted,
1326 } = &live_kit.screen_track
1327 {
1328 (*cur_publish_id != publish_id, *muted)
1329 } else {
1330 (true, false)
1331 };
1332
1333 match publication {
1334 Ok(publication) => {
1335 if canceled {
1336 live_kit.room.unpublish_track(publication);
1337 } else {
1338 if muted {
1339 cx.background().spawn(publication.set_mute(muted)).detach();
1340 }
1341 live_kit.screen_track = LocalTrack::Published {
1342 track_publication: publication,
1343 muted,
1344 };
1345 cx.notify();
1346 }
1347
1348 Audio::play_sound(Sound::StartScreenshare, cx);
1349
1350 Ok(())
1351 }
1352 Err(error) => {
1353 if canceled {
1354 Ok(())
1355 } else {
1356 live_kit.screen_track = LocalTrack::None;
1357 cx.notify();
1358 Err(error)
1359 }
1360 }
1361 }
1362 })
1363 })
1364 }
1365
1366 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1367 let should_mute = !self.is_muted(cx);
1368 if let Some(live_kit) = self.live_kit.as_mut() {
1369 if matches!(live_kit.microphone_track, LocalTrack::None) {
1370 return Ok(self.share_microphone(cx));
1371 }
1372
1373 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1374 live_kit.muted_by_user = should_mute;
1375
1376 if old_muted == true && live_kit.deafened == true {
1377 if let Some(task) = self.toggle_deafen(cx).ok() {
1378 task.detach();
1379 }
1380 }
1381
1382 Ok(ret_task)
1383 } else {
1384 Err(anyhow!("LiveKit not started"))
1385 }
1386 }
1387
1388 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1389 if let Some(live_kit) = self.live_kit.as_mut() {
1390 (*live_kit).deafened = !live_kit.deafened;
1391
1392 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1393 // Context notification is sent within set_mute itself.
1394 let mut mute_task = None;
1395 // When deafening, mute user's mic as well.
1396 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1397 if live_kit.deafened || !live_kit.muted_by_user {
1398 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1399 };
1400 for participant in self.remote_participants.values() {
1401 for track in live_kit
1402 .room
1403 .remote_audio_track_publications(&participant.user.id.to_string())
1404 {
1405 tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1406 }
1407 }
1408
1409 Ok(cx.foreground().spawn(async move {
1410 if let Some(mute_task) = mute_task {
1411 mute_task.await?;
1412 }
1413 for task in tasks {
1414 task.await?;
1415 }
1416 Ok(())
1417 }))
1418 } else {
1419 Err(anyhow!("LiveKit not started"))
1420 }
1421 }
1422
1423 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1424 if self.status.is_offline() {
1425 return Err(anyhow!("room is offline"));
1426 }
1427
1428 let live_kit = self
1429 .live_kit
1430 .as_mut()
1431 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1432 match mem::take(&mut live_kit.screen_track) {
1433 LocalTrack::None => Err(anyhow!("screen was not shared")),
1434 LocalTrack::Pending { .. } => {
1435 cx.notify();
1436 Ok(())
1437 }
1438 LocalTrack::Published {
1439 track_publication, ..
1440 } => {
1441 live_kit.room.unpublish_track(track_publication);
1442 cx.notify();
1443
1444 Audio::play_sound(Sound::StopScreenshare, cx);
1445 Ok(())
1446 }
1447 }
1448 }
1449
1450 #[cfg(any(test, feature = "test-support"))]
1451 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1452 self.live_kit
1453 .as_ref()
1454 .unwrap()
1455 .room
1456 .set_display_sources(sources);
1457 }
1458}
1459
1460struct LiveKitRoom {
1461 room: Arc<live_kit_client::Room>,
1462 screen_track: LocalTrack,
1463 microphone_track: LocalTrack,
1464 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1465 muted_by_user: bool,
1466 deafened: bool,
1467 speaking: bool,
1468 next_publish_id: usize,
1469 _maintain_room: Task<()>,
1470 _maintain_tracks: [Task<()>; 2],
1471}
1472
1473impl LiveKitRoom {
1474 fn set_mute(
1475 self: &mut LiveKitRoom,
1476 should_mute: bool,
1477 cx: &mut ModelContext<Room>,
1478 ) -> Result<(Task<Result<()>>, bool)> {
1479 if !should_mute {
1480 // clear user muting state.
1481 self.muted_by_user = false;
1482 }
1483
1484 let (result, old_muted) = match &mut self.microphone_track {
1485 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1486 LocalTrack::Pending { muted, .. } => {
1487 let old_muted = *muted;
1488 *muted = should_mute;
1489 cx.notify();
1490 Ok((Task::Ready(Some(Ok(()))), old_muted))
1491 }
1492 LocalTrack::Published {
1493 track_publication,
1494 muted,
1495 } => {
1496 let old_muted = *muted;
1497 *muted = should_mute;
1498 cx.notify();
1499 Ok((
1500 cx.background().spawn(track_publication.set_mute(*muted)),
1501 old_muted,
1502 ))
1503 }
1504 }?;
1505
1506 if old_muted != should_mute {
1507 if should_mute {
1508 Audio::play_sound(Sound::Mute, cx);
1509 } else {
1510 Audio::play_sound(Sound::Unmute, cx);
1511 }
1512 }
1513
1514 Ok((result, old_muted))
1515 }
1516}
1517
1518enum LocalTrack {
1519 None,
1520 Pending {
1521 publish_id: usize,
1522 muted: bool,
1523 },
1524 Published {
1525 track_publication: LocalTrackPublication,
1526 muted: bool,
1527 },
1528}
1529
1530impl Default for LocalTrack {
1531 fn default() -> Self {
1532 Self::None
1533 }
1534}
1535
1536#[derive(Copy, Clone, PartialEq, Eq)]
1537pub enum RoomStatus {
1538 Online,
1539 Rejoining,
1540 Offline,
1541}
1542
1543impl RoomStatus {
1544 pub fn is_offline(&self) -> bool {
1545 matches!(self, RoomStatus::Offline)
1546 }
1547
1548 pub fn is_online(&self) -> bool {
1549 matches!(self, RoomStatus::Online)
1550 }
1551}