1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
4 IncomingCall,
5};
6use anyhow::{anyhow, Result};
7use audio::{Audio, Sound};
8use client::{
9 proto::{self, PeerId},
10 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
11};
12use collections::{BTreeMap, HashMap, HashSet};
13use fs::Fs;
14use futures::{FutureExt, StreamExt};
15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
16use language::LanguageRegistry;
17use live_kit_client::{
18 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
19 RemoteVideoTrackUpdate,
20};
21use postage::stream::Stream;
22use project::Project;
23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
24use util::{post_inc, ResultExt, TryFutureExt};
25
26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
27
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum Event {
30 ParticipantLocationChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteVideoTracksChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteAudioTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteProjectShared {
40 owner: Arc<User>,
41 project_id: u64,
42 worktree_root_names: Vec<String>,
43 },
44 RemoteProjectUnshared {
45 project_id: u64,
46 },
47 RemoteProjectJoined {
48 project_id: u64,
49 },
50 RemoteProjectInvitationDiscarded {
51 project_id: u64,
52 },
53 Left,
54}
55
56pub struct Room {
57 id: u64,
58 channel_id: Option<u64>,
59 live_kit: Option<LiveKitRoom>,
60 status: RoomStatus,
61 shared_projects: HashSet<WeakModelHandle<Project>>,
62 joined_projects: HashSet<WeakModelHandle<Project>>,
63 local_participant: LocalParticipant,
64 remote_participants: BTreeMap<u64, RemoteParticipant>,
65 pending_participants: Vec<Arc<User>>,
66 participant_user_ids: HashSet<u64>,
67 pending_call_count: usize,
68 leave_when_empty: bool,
69 client: Arc<Client>,
70 user_store: ModelHandle<UserStore>,
71 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
72 subscriptions: Vec<client::Subscription>,
73 pending_room_update: Option<Task<()>>,
74 maintain_connection: Option<Task<Option<()>>>,
75}
76
77impl Entity for Room {
78 type Event = Event;
79
80 fn release(&mut self, cx: &mut AppContext) {
81 if self.status.is_online() {
82 self.leave_internal(cx).detach_and_log_err(cx);
83 }
84 }
85
86 fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
87 if self.status.is_online() {
88 let leave = self.leave_internal(cx);
89 Some(
90 cx.background()
91 .spawn(async move {
92 leave.await.log_err();
93 })
94 .boxed(),
95 )
96 } else {
97 None
98 }
99 }
100}
101
102impl Room {
103 pub fn channel_id(&self) -> Option<u64> {
104 self.channel_id
105 }
106
107 pub fn is_sharing_project(&self) -> bool {
108 !self.shared_projects.is_empty()
109 }
110
111 #[cfg(any(test, feature = "test-support"))]
112 pub fn is_connected(&self) -> bool {
113 if let Some(live_kit) = self.live_kit.as_ref() {
114 matches!(
115 *live_kit.room.status().borrow(),
116 live_kit_client::ConnectionState::Connected { .. }
117 )
118 } else {
119 false
120 }
121 }
122
123 fn new(
124 id: u64,
125 channel_id: Option<u64>,
126 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
127 client: Arc<Client>,
128 user_store: ModelHandle<UserStore>,
129 cx: &mut ModelContext<Self>,
130 ) -> Self {
131 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
132 let room = live_kit_client::Room::new();
133 let mut status = room.status();
134 // Consume the initial status of the room.
135 let _ = status.try_recv();
136 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
137 while let Some(status) = status.next().await {
138 let this = if let Some(this) = this.upgrade(&cx) {
139 this
140 } else {
141 break;
142 };
143
144 if status == live_kit_client::ConnectionState::Disconnected {
145 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
146 break;
147 }
148 }
149 });
150
151 let mut track_video_changes = room.remote_video_track_updates();
152 let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
153 while let Some(track_change) = track_video_changes.next().await {
154 let this = if let Some(this) = this.upgrade(&cx) {
155 this
156 } else {
157 break;
158 };
159
160 this.update(&mut cx, |this, cx| {
161 this.remote_video_track_updated(track_change, cx).log_err()
162 });
163 }
164 });
165
166 let mut track_audio_changes = room.remote_audio_track_updates();
167 let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
168 while let Some(track_change) = track_audio_changes.next().await {
169 let this = if let Some(this) = this.upgrade(&cx) {
170 this
171 } else {
172 break;
173 };
174
175 this.update(&mut cx, |this, cx| {
176 this.remote_audio_track_updated(track_change, cx).log_err()
177 });
178 }
179 });
180
181 let connect = room.connect(&connection_info.server_url, &connection_info.token);
182 cx.spawn(|this, mut cx| async move {
183 connect.await?;
184
185 if !cx.read(Self::mute_on_join) {
186 this.update(&mut cx, |this, cx| this.share_microphone(cx))
187 .await?;
188 }
189
190 anyhow::Ok(())
191 })
192 .detach_and_log_err(cx);
193
194 Some(LiveKitRoom {
195 room,
196 screen_track: LocalTrack::None,
197 microphone_track: LocalTrack::None,
198 next_publish_id: 0,
199 muted_by_user: false,
200 deafened: false,
201 speaking: false,
202 _maintain_room,
203 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
204 })
205 } else {
206 None
207 };
208
209 let maintain_connection =
210 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
211
212 Audio::play_sound(Sound::Joined, cx);
213
214 Self {
215 id,
216 channel_id,
217 live_kit: live_kit_room,
218 status: RoomStatus::Online,
219 shared_projects: Default::default(),
220 joined_projects: Default::default(),
221 participant_user_ids: Default::default(),
222 local_participant: Default::default(),
223 remote_participants: Default::default(),
224 pending_participants: Default::default(),
225 pending_call_count: 0,
226 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
227 leave_when_empty: false,
228 pending_room_update: None,
229 client,
230 user_store,
231 follows_by_leader_id_project_id: Default::default(),
232 maintain_connection: Some(maintain_connection),
233 }
234 }
235
236 pub(crate) fn create(
237 called_user_id: u64,
238 initial_project: Option<ModelHandle<Project>>,
239 client: Arc<Client>,
240 user_store: ModelHandle<UserStore>,
241 cx: &mut AppContext,
242 ) -> Task<Result<ModelHandle<Self>>> {
243 cx.spawn(|mut cx| async move {
244 let response = client.request(proto::CreateRoom {}).await?;
245 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
246 let room = cx.add_model(|cx| {
247 Self::new(
248 room_proto.id,
249 None,
250 response.live_kit_connection_info,
251 client,
252 user_store,
253 cx,
254 )
255 });
256
257 let initial_project_id = if let Some(initial_project) = initial_project {
258 let initial_project_id = room
259 .update(&mut cx, |room, cx| {
260 room.share_project(initial_project.clone(), cx)
261 })
262 .await?;
263 Some(initial_project_id)
264 } else {
265 None
266 };
267
268 match room
269 .update(&mut cx, |room, cx| {
270 room.leave_when_empty = true;
271 room.call(called_user_id, initial_project_id, cx)
272 })
273 .await
274 {
275 Ok(()) => Ok(room),
276 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
277 }
278 })
279 }
280
281 pub(crate) fn join_channel(
282 channel_id: u64,
283 client: Arc<Client>,
284 user_store: ModelHandle<UserStore>,
285 cx: &mut AppContext,
286 ) -> Task<Result<ModelHandle<Self>>> {
287 cx.spawn(|cx| async move {
288 Self::from_join_response(
289 client.request(proto::JoinChannel { channel_id }).await?,
290 client,
291 user_store,
292 cx,
293 )
294 })
295 }
296
297 pub(crate) fn join(
298 call: &IncomingCall,
299 client: Arc<Client>,
300 user_store: ModelHandle<UserStore>,
301 cx: &mut AppContext,
302 ) -> Task<Result<ModelHandle<Self>>> {
303 let id = call.room_id;
304 cx.spawn(|cx| async move {
305 Self::from_join_response(
306 client.request(proto::JoinRoom { id }).await?,
307 client,
308 user_store,
309 cx,
310 )
311 })
312 }
313
314 pub fn mute_on_join(cx: &AppContext) -> bool {
315 settings::get::<CallSettings>(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
316 }
317
318 fn from_join_response(
319 response: proto::JoinRoomResponse,
320 client: Arc<Client>,
321 user_store: ModelHandle<UserStore>,
322 mut cx: AsyncAppContext,
323 ) -> Result<ModelHandle<Self>> {
324 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
325 let room = cx.add_model(|cx| {
326 Self::new(
327 room_proto.id,
328 response.channel_id,
329 response.live_kit_connection_info,
330 client,
331 user_store,
332 cx,
333 )
334 });
335 room.update(&mut cx, |room, cx| {
336 room.leave_when_empty = room.channel_id.is_none();
337 room.apply_room_update(room_proto, cx)?;
338 anyhow::Ok(())
339 })?;
340 Ok(room)
341 }
342
343 fn should_leave(&self) -> bool {
344 self.leave_when_empty
345 && self.pending_room_update.is_none()
346 && self.pending_participants.is_empty()
347 && self.remote_participants.is_empty()
348 && self.pending_call_count == 0
349 }
350
351 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
352 cx.notify();
353 cx.emit(Event::Left);
354 self.leave_internal(cx)
355 }
356
357 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
358 if self.status.is_offline() {
359 return Task::ready(Err(anyhow!("room is offline")));
360 }
361
362 log::info!("leaving room");
363 Audio::play_sound(Sound::Leave, cx);
364
365 self.clear_state(cx);
366
367 let leave_room = self.client.request(proto::LeaveRoom {});
368 cx.background().spawn(async move {
369 leave_room.await?;
370 anyhow::Ok(())
371 })
372 }
373
374 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
375 for project in self.shared_projects.drain() {
376 if let Some(project) = project.upgrade(cx) {
377 project.update(cx, |project, cx| {
378 project.unshare(cx).log_err();
379 });
380 }
381 }
382 for project in self.joined_projects.drain() {
383 if let Some(project) = project.upgrade(cx) {
384 project.update(cx, |project, cx| {
385 project.disconnected_from_host(cx);
386 project.close(cx);
387 });
388 }
389 }
390
391 self.status = RoomStatus::Offline;
392 self.remote_participants.clear();
393 self.pending_participants.clear();
394 self.participant_user_ids.clear();
395 self.subscriptions.clear();
396 self.live_kit.take();
397 self.pending_room_update.take();
398 self.maintain_connection.take();
399 }
400
401 async fn maintain_connection(
402 this: WeakModelHandle<Self>,
403 client: Arc<Client>,
404 mut cx: AsyncAppContext,
405 ) -> Result<()> {
406 let mut client_status = client.status();
407 loop {
408 let _ = client_status.try_recv();
409 let is_connected = client_status.borrow().is_connected();
410 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
411 if !is_connected || client_status.next().await.is_some() {
412 log::info!("detected client disconnection");
413
414 this.upgrade(&cx)
415 .ok_or_else(|| anyhow!("room was dropped"))?
416 .update(&mut cx, |this, cx| {
417 this.status = RoomStatus::Rejoining;
418 cx.notify();
419 });
420
421 // Wait for client to re-establish a connection to the server.
422 {
423 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
424 let client_reconnection = async {
425 let mut remaining_attempts = 3;
426 while remaining_attempts > 0 {
427 if client_status.borrow().is_connected() {
428 log::info!("client reconnected, attempting to rejoin room");
429
430 let Some(this) = this.upgrade(&cx) else { break };
431 if this
432 .update(&mut cx, |this, cx| this.rejoin(cx))
433 .await
434 .log_err()
435 .is_some()
436 {
437 return true;
438 } else {
439 remaining_attempts -= 1;
440 }
441 } else if client_status.borrow().is_signed_out() {
442 return false;
443 }
444
445 log::info!(
446 "waiting for client status change, remaining attempts {}",
447 remaining_attempts
448 );
449 client_status.next().await;
450 }
451 false
452 }
453 .fuse();
454 futures::pin_mut!(client_reconnection);
455
456 futures::select_biased! {
457 reconnected = client_reconnection => {
458 if reconnected {
459 log::info!("successfully reconnected to room");
460 // If we successfully joined the room, go back around the loop
461 // waiting for future connection status changes.
462 continue;
463 }
464 }
465 _ = reconnection_timeout => {
466 log::info!("room reconnection timeout expired");
467 }
468 }
469 }
470
471 break;
472 }
473 }
474
475 // The client failed to re-establish a connection to the server
476 // or an error occurred while trying to re-join the room. Either way
477 // we leave the room and return an error.
478 if let Some(this) = this.upgrade(&cx) {
479 log::info!("reconnection failed, leaving room");
480 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
481 }
482 Err(anyhow!(
483 "can't reconnect to room: client failed to re-establish connection"
484 ))
485 }
486
487 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
488 let mut projects = HashMap::default();
489 let mut reshared_projects = Vec::new();
490 let mut rejoined_projects = Vec::new();
491 self.shared_projects.retain(|project| {
492 if let Some(handle) = project.upgrade(cx) {
493 let project = handle.read(cx);
494 if let Some(project_id) = project.remote_id() {
495 projects.insert(project_id, handle.clone());
496 reshared_projects.push(proto::UpdateProject {
497 project_id,
498 worktrees: project.worktree_metadata_protos(cx),
499 });
500 return true;
501 }
502 }
503 false
504 });
505 self.joined_projects.retain(|project| {
506 if let Some(handle) = project.upgrade(cx) {
507 let project = handle.read(cx);
508 if let Some(project_id) = project.remote_id() {
509 projects.insert(project_id, handle.clone());
510 rejoined_projects.push(proto::RejoinProject {
511 id: project_id,
512 worktrees: project
513 .worktrees(cx)
514 .map(|worktree| {
515 let worktree = worktree.read(cx);
516 proto::RejoinWorktree {
517 id: worktree.id().to_proto(),
518 scan_id: worktree.completed_scan_id() as u64,
519 }
520 })
521 .collect(),
522 });
523 }
524 return true;
525 }
526 false
527 });
528
529 let response = self.client.request_envelope(proto::RejoinRoom {
530 id: self.id,
531 reshared_projects,
532 rejoined_projects,
533 });
534
535 cx.spawn(|this, mut cx| async move {
536 let response = response.await?;
537 let message_id = response.message_id;
538 let response = response.payload;
539 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
540 this.update(&mut cx, |this, cx| {
541 this.status = RoomStatus::Online;
542 this.apply_room_update(room_proto, cx)?;
543
544 for reshared_project in response.reshared_projects {
545 if let Some(project) = projects.get(&reshared_project.id) {
546 project.update(cx, |project, cx| {
547 project.reshared(reshared_project, cx).log_err();
548 });
549 }
550 }
551
552 for rejoined_project in response.rejoined_projects {
553 if let Some(project) = projects.get(&rejoined_project.id) {
554 project.update(cx, |project, cx| {
555 project.rejoined(rejoined_project, message_id, cx).log_err();
556 });
557 }
558 }
559
560 anyhow::Ok(())
561 })
562 })
563 }
564
565 pub fn id(&self) -> u64 {
566 self.id
567 }
568
569 pub fn status(&self) -> RoomStatus {
570 self.status
571 }
572
573 pub fn local_participant(&self) -> &LocalParticipant {
574 &self.local_participant
575 }
576
577 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
578 &self.remote_participants
579 }
580
581 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
582 self.remote_participants
583 .values()
584 .find(|p| p.peer_id == peer_id)
585 }
586
587 pub fn pending_participants(&self) -> &[Arc<User>] {
588 &self.pending_participants
589 }
590
591 pub fn contains_participant(&self, user_id: u64) -> bool {
592 self.participant_user_ids.contains(&user_id)
593 }
594
595 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
596 self.follows_by_leader_id_project_id
597 .get(&(leader_id, project_id))
598 .map_or(&[], |v| v.as_slice())
599 }
600
601 /// Returns the most 'active' projects, defined as most people in the project
602 pub fn most_active_project(&self) -> Option<(u64, u64)> {
603 let mut projects = HashMap::default();
604 let mut hosts = HashMap::default();
605 for participant in self.remote_participants.values() {
606 match participant.location {
607 ParticipantLocation::SharedProject { project_id } => {
608 *projects.entry(project_id).or_insert(0) += 1;
609 }
610 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
611 }
612 for project in &participant.projects {
613 *projects.entry(project.id).or_insert(0) += 1;
614 hosts.insert(project.id, participant.user.id);
615 }
616 }
617
618 let mut pairs: Vec<(u64, usize)> = projects.into_iter().collect();
619 pairs.sort_by_key(|(_, count)| *count as i32);
620
621 pairs
622 .first()
623 .map(|(project_id, _)| (*project_id, hosts[&project_id]))
624 }
625
626 async fn handle_room_updated(
627 this: ModelHandle<Self>,
628 envelope: TypedEnvelope<proto::RoomUpdated>,
629 _: Arc<Client>,
630 mut cx: AsyncAppContext,
631 ) -> Result<()> {
632 let room = envelope
633 .payload
634 .room
635 .ok_or_else(|| anyhow!("invalid room"))?;
636 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
637 }
638
639 fn apply_room_update(
640 &mut self,
641 mut room: proto::Room,
642 cx: &mut ModelContext<Self>,
643 ) -> Result<()> {
644 // Filter ourselves out from the room's participants.
645 let local_participant_ix = room
646 .participants
647 .iter()
648 .position(|participant| Some(participant.user_id) == self.client.user_id());
649 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
650
651 let pending_participant_user_ids = room
652 .pending_participants
653 .iter()
654 .map(|p| p.user_id)
655 .collect::<Vec<_>>();
656
657 let remote_participant_user_ids = room
658 .participants
659 .iter()
660 .map(|p| p.user_id)
661 .collect::<Vec<_>>();
662
663 let (remote_participants, pending_participants) =
664 self.user_store.update(cx, move |user_store, cx| {
665 (
666 user_store.get_users(remote_participant_user_ids, cx),
667 user_store.get_users(pending_participant_user_ids, cx),
668 )
669 });
670
671 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
672 let (remote_participants, pending_participants) =
673 futures::join!(remote_participants, pending_participants);
674
675 this.update(&mut cx, |this, cx| {
676 this.participant_user_ids.clear();
677
678 if let Some(participant) = local_participant {
679 this.local_participant.projects = participant.projects;
680 } else {
681 this.local_participant.projects.clear();
682 }
683
684 if let Some(participants) = remote_participants.log_err() {
685 for (participant, user) in room.participants.into_iter().zip(participants) {
686 let Some(peer_id) = participant.peer_id else {
687 continue;
688 };
689 this.participant_user_ids.insert(participant.user_id);
690
691 let old_projects = this
692 .remote_participants
693 .get(&participant.user_id)
694 .into_iter()
695 .flat_map(|existing| &existing.projects)
696 .map(|project| project.id)
697 .collect::<HashSet<_>>();
698 let new_projects = participant
699 .projects
700 .iter()
701 .map(|project| project.id)
702 .collect::<HashSet<_>>();
703
704 for project in &participant.projects {
705 if !old_projects.contains(&project.id) {
706 cx.emit(Event::RemoteProjectShared {
707 owner: user.clone(),
708 project_id: project.id,
709 worktree_root_names: project.worktree_root_names.clone(),
710 });
711 }
712 }
713
714 for unshared_project_id in old_projects.difference(&new_projects) {
715 this.joined_projects.retain(|project| {
716 if let Some(project) = project.upgrade(cx) {
717 project.update(cx, |project, cx| {
718 if project.remote_id() == Some(*unshared_project_id) {
719 project.disconnected_from_host(cx);
720 false
721 } else {
722 true
723 }
724 })
725 } else {
726 false
727 }
728 });
729 cx.emit(Event::RemoteProjectUnshared {
730 project_id: *unshared_project_id,
731 });
732 }
733
734 let location = ParticipantLocation::from_proto(participant.location)
735 .unwrap_or(ParticipantLocation::External);
736 if let Some(remote_participant) =
737 this.remote_participants.get_mut(&participant.user_id)
738 {
739 remote_participant.projects = participant.projects;
740 remote_participant.peer_id = peer_id;
741 if location != remote_participant.location {
742 remote_participant.location = location;
743 cx.emit(Event::ParticipantLocationChanged {
744 participant_id: peer_id,
745 });
746 }
747 } else {
748 this.remote_participants.insert(
749 participant.user_id,
750 RemoteParticipant {
751 user: user.clone(),
752 participant_index: ParticipantIndex(
753 participant.participant_index,
754 ),
755 peer_id,
756 projects: participant.projects,
757 location,
758 muted: true,
759 speaking: false,
760 video_tracks: Default::default(),
761 audio_tracks: Default::default(),
762 },
763 );
764
765 Audio::play_sound(Sound::Joined, cx);
766
767 if let Some(live_kit) = this.live_kit.as_ref() {
768 let video_tracks =
769 live_kit.room.remote_video_tracks(&user.id.to_string());
770 let audio_tracks =
771 live_kit.room.remote_audio_tracks(&user.id.to_string());
772 let publications = live_kit
773 .room
774 .remote_audio_track_publications(&user.id.to_string());
775
776 for track in video_tracks {
777 this.remote_video_track_updated(
778 RemoteVideoTrackUpdate::Subscribed(track),
779 cx,
780 )
781 .log_err();
782 }
783
784 for (track, publication) in
785 audio_tracks.iter().zip(publications.iter())
786 {
787 this.remote_audio_track_updated(
788 RemoteAudioTrackUpdate::Subscribed(
789 track.clone(),
790 publication.clone(),
791 ),
792 cx,
793 )
794 .log_err();
795 }
796 }
797 }
798 }
799
800 this.remote_participants.retain(|user_id, participant| {
801 if this.participant_user_ids.contains(user_id) {
802 true
803 } else {
804 for project in &participant.projects {
805 cx.emit(Event::RemoteProjectUnshared {
806 project_id: project.id,
807 });
808 }
809 false
810 }
811 });
812 }
813
814 if let Some(pending_participants) = pending_participants.log_err() {
815 this.pending_participants = pending_participants;
816 for participant in &this.pending_participants {
817 this.participant_user_ids.insert(participant.id);
818 }
819 }
820
821 this.follows_by_leader_id_project_id.clear();
822 for follower in room.followers {
823 let project_id = follower.project_id;
824 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
825 (Some(leader), Some(follower)) => (leader, follower),
826
827 _ => {
828 log::error!("Follower message {follower:?} missing some state");
829 continue;
830 }
831 };
832
833 let list = this
834 .follows_by_leader_id_project_id
835 .entry((leader, project_id))
836 .or_insert(Vec::new());
837 if !list.contains(&follower) {
838 list.push(follower);
839 }
840 }
841
842 this.pending_room_update.take();
843 if this.should_leave() {
844 log::info!("room is empty, leaving");
845 let _ = this.leave(cx);
846 }
847
848 this.user_store.update(cx, |user_store, cx| {
849 let participant_indices_by_user_id = this
850 .remote_participants
851 .iter()
852 .map(|(user_id, participant)| (*user_id, participant.participant_index))
853 .collect();
854 user_store.set_participant_indices(participant_indices_by_user_id, cx);
855 });
856
857 this.check_invariants();
858 cx.notify();
859 });
860 }));
861
862 cx.notify();
863 Ok(())
864 }
865
866 fn remote_video_track_updated(
867 &mut self,
868 change: RemoteVideoTrackUpdate,
869 cx: &mut ModelContext<Self>,
870 ) -> Result<()> {
871 match change {
872 RemoteVideoTrackUpdate::Subscribed(track) => {
873 let user_id = track.publisher_id().parse()?;
874 let track_id = track.sid().to_string();
875 let participant = self
876 .remote_participants
877 .get_mut(&user_id)
878 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
879 participant.video_tracks.insert(
880 track_id.clone(),
881 Arc::new(RemoteVideoTrack {
882 live_kit_track: track,
883 }),
884 );
885 cx.emit(Event::RemoteVideoTracksChanged {
886 participant_id: participant.peer_id,
887 });
888 }
889 RemoteVideoTrackUpdate::Unsubscribed {
890 publisher_id,
891 track_id,
892 } => {
893 let user_id = publisher_id.parse()?;
894 let participant = self
895 .remote_participants
896 .get_mut(&user_id)
897 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
898 participant.video_tracks.remove(&track_id);
899 cx.emit(Event::RemoteVideoTracksChanged {
900 participant_id: participant.peer_id,
901 });
902 }
903 }
904
905 cx.notify();
906 Ok(())
907 }
908
909 fn remote_audio_track_updated(
910 &mut self,
911 change: RemoteAudioTrackUpdate,
912 cx: &mut ModelContext<Self>,
913 ) -> Result<()> {
914 match change {
915 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
916 let mut speaker_ids = speakers
917 .into_iter()
918 .filter_map(|speaker_sid| speaker_sid.parse().ok())
919 .collect::<Vec<u64>>();
920 speaker_ids.sort_unstable();
921 for (sid, participant) in &mut self.remote_participants {
922 if let Ok(_) = speaker_ids.binary_search(sid) {
923 participant.speaking = true;
924 } else {
925 participant.speaking = false;
926 }
927 }
928 if let Some(id) = self.client.user_id() {
929 if let Some(room) = &mut self.live_kit {
930 if let Ok(_) = speaker_ids.binary_search(&id) {
931 room.speaking = true;
932 } else {
933 room.speaking = false;
934 }
935 }
936 }
937 cx.notify();
938 }
939 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
940 let mut found = false;
941 for participant in &mut self.remote_participants.values_mut() {
942 for track in participant.audio_tracks.values() {
943 if track.sid() == track_id {
944 found = true;
945 break;
946 }
947 }
948 if found {
949 participant.muted = muted;
950 break;
951 }
952 }
953
954 cx.notify();
955 }
956 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
957 let user_id = track.publisher_id().parse()?;
958 let track_id = track.sid().to_string();
959 let participant = self
960 .remote_participants
961 .get_mut(&user_id)
962 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
963
964 participant.audio_tracks.insert(track_id.clone(), track);
965 participant.muted = publication.is_muted();
966
967 cx.emit(Event::RemoteAudioTracksChanged {
968 participant_id: participant.peer_id,
969 });
970 }
971 RemoteAudioTrackUpdate::Unsubscribed {
972 publisher_id,
973 track_id,
974 } => {
975 let user_id = publisher_id.parse()?;
976 let participant = self
977 .remote_participants
978 .get_mut(&user_id)
979 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
980 participant.audio_tracks.remove(&track_id);
981 cx.emit(Event::RemoteAudioTracksChanged {
982 participant_id: participant.peer_id,
983 });
984 }
985 }
986
987 cx.notify();
988 Ok(())
989 }
990
991 fn check_invariants(&self) {
992 #[cfg(any(test, feature = "test-support"))]
993 {
994 for participant in self.remote_participants.values() {
995 assert!(self.participant_user_ids.contains(&participant.user.id));
996 assert_ne!(participant.user.id, self.client.user_id().unwrap());
997 }
998
999 for participant in &self.pending_participants {
1000 assert!(self.participant_user_ids.contains(&participant.id));
1001 assert_ne!(participant.id, self.client.user_id().unwrap());
1002 }
1003
1004 assert_eq!(
1005 self.participant_user_ids.len(),
1006 self.remote_participants.len() + self.pending_participants.len()
1007 );
1008 }
1009 }
1010
1011 pub(crate) fn call(
1012 &mut self,
1013 called_user_id: u64,
1014 initial_project_id: Option<u64>,
1015 cx: &mut ModelContext<Self>,
1016 ) -> Task<Result<()>> {
1017 if self.status.is_offline() {
1018 return Task::ready(Err(anyhow!("room is offline")));
1019 }
1020
1021 cx.notify();
1022 let client = self.client.clone();
1023 let room_id = self.id;
1024 self.pending_call_count += 1;
1025 cx.spawn(|this, mut cx| async move {
1026 let result = client
1027 .request(proto::Call {
1028 room_id,
1029 called_user_id,
1030 initial_project_id,
1031 })
1032 .await;
1033 this.update(&mut cx, |this, cx| {
1034 this.pending_call_count -= 1;
1035 if this.should_leave() {
1036 this.leave(cx).detach_and_log_err(cx);
1037 }
1038 });
1039 result?;
1040 Ok(())
1041 })
1042 }
1043
1044 pub fn join_project(
1045 &mut self,
1046 id: u64,
1047 language_registry: Arc<LanguageRegistry>,
1048 fs: Arc<dyn Fs>,
1049 cx: &mut ModelContext<Self>,
1050 ) -> Task<Result<ModelHandle<Project>>> {
1051 let client = self.client.clone();
1052 let user_store = self.user_store.clone();
1053 cx.emit(Event::RemoteProjectJoined { project_id: id });
1054 cx.spawn(|this, mut cx| async move {
1055 let project =
1056 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1057
1058 this.update(&mut cx, |this, cx| {
1059 this.joined_projects.retain(|project| {
1060 if let Some(project) = project.upgrade(cx) {
1061 !project.read(cx).is_read_only()
1062 } else {
1063 false
1064 }
1065 });
1066 this.joined_projects.insert(project.downgrade());
1067 });
1068 Ok(project)
1069 })
1070 }
1071
1072 pub(crate) fn share_project(
1073 &mut self,
1074 project: ModelHandle<Project>,
1075 cx: &mut ModelContext<Self>,
1076 ) -> Task<Result<u64>> {
1077 if let Some(project_id) = project.read(cx).remote_id() {
1078 return Task::ready(Ok(project_id));
1079 }
1080
1081 let request = self.client.request(proto::ShareProject {
1082 room_id: self.id(),
1083 worktrees: project.read(cx).worktree_metadata_protos(cx),
1084 });
1085 cx.spawn(|this, mut cx| async move {
1086 let response = request.await?;
1087
1088 project.update(&mut cx, |project, cx| {
1089 project.shared(response.project_id, cx)
1090 })?;
1091
1092 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1093 this.update(&mut cx, |this, cx| {
1094 this.shared_projects.insert(project.downgrade());
1095 let active_project = this.local_participant.active_project.as_ref();
1096 if active_project.map_or(false, |location| *location == project) {
1097 this.set_location(Some(&project), cx)
1098 } else {
1099 Task::ready(Ok(()))
1100 }
1101 })
1102 .await?;
1103
1104 Ok(response.project_id)
1105 })
1106 }
1107
1108 pub(crate) fn unshare_project(
1109 &mut self,
1110 project: ModelHandle<Project>,
1111 cx: &mut ModelContext<Self>,
1112 ) -> Result<()> {
1113 let project_id = match project.read(cx).remote_id() {
1114 Some(project_id) => project_id,
1115 None => return Ok(()),
1116 };
1117
1118 self.client.send(proto::UnshareProject { project_id })?;
1119 project.update(cx, |this, cx| this.unshare(cx))
1120 }
1121
1122 pub(crate) fn set_location(
1123 &mut self,
1124 project: Option<&ModelHandle<Project>>,
1125 cx: &mut ModelContext<Self>,
1126 ) -> Task<Result<()>> {
1127 if self.status.is_offline() {
1128 return Task::ready(Err(anyhow!("room is offline")));
1129 }
1130
1131 let client = self.client.clone();
1132 let room_id = self.id;
1133 let location = if let Some(project) = project {
1134 self.local_participant.active_project = Some(project.downgrade());
1135 if let Some(project_id) = project.read(cx).remote_id() {
1136 proto::participant_location::Variant::SharedProject(
1137 proto::participant_location::SharedProject { id: project_id },
1138 )
1139 } else {
1140 proto::participant_location::Variant::UnsharedProject(
1141 proto::participant_location::UnsharedProject {},
1142 )
1143 }
1144 } else {
1145 self.local_participant.active_project = None;
1146 proto::participant_location::Variant::External(proto::participant_location::External {})
1147 };
1148
1149 cx.notify();
1150 cx.foreground().spawn(async move {
1151 client
1152 .request(proto::UpdateParticipantLocation {
1153 room_id,
1154 location: Some(proto::ParticipantLocation {
1155 variant: Some(location),
1156 }),
1157 })
1158 .await?;
1159 Ok(())
1160 })
1161 }
1162
1163 pub fn is_screen_sharing(&self) -> bool {
1164 self.live_kit.as_ref().map_or(false, |live_kit| {
1165 !matches!(live_kit.screen_track, LocalTrack::None)
1166 })
1167 }
1168
1169 pub fn is_sharing_mic(&self) -> bool {
1170 self.live_kit.as_ref().map_or(false, |live_kit| {
1171 !matches!(live_kit.microphone_track, LocalTrack::None)
1172 })
1173 }
1174
1175 pub fn is_muted(&self, cx: &AppContext) -> bool {
1176 self.live_kit
1177 .as_ref()
1178 .and_then(|live_kit| match &live_kit.microphone_track {
1179 LocalTrack::None => Some(Self::mute_on_join(cx)),
1180 LocalTrack::Pending { muted, .. } => Some(*muted),
1181 LocalTrack::Published { muted, .. } => Some(*muted),
1182 })
1183 .unwrap_or(false)
1184 }
1185
1186 pub fn is_speaking(&self) -> bool {
1187 self.live_kit
1188 .as_ref()
1189 .map_or(false, |live_kit| live_kit.speaking)
1190 }
1191
1192 pub fn is_deafened(&self) -> Option<bool> {
1193 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1194 }
1195
1196 #[track_caller]
1197 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1198 if self.status.is_offline() {
1199 return Task::ready(Err(anyhow!("room is offline")));
1200 } else if self.is_sharing_mic() {
1201 return Task::ready(Err(anyhow!("microphone was already shared")));
1202 }
1203
1204 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1205 let publish_id = post_inc(&mut live_kit.next_publish_id);
1206 live_kit.microphone_track = LocalTrack::Pending {
1207 publish_id,
1208 muted: false,
1209 };
1210 cx.notify();
1211 publish_id
1212 } else {
1213 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1214 };
1215
1216 cx.spawn_weak(|this, mut cx| async move {
1217 let publish_track = async {
1218 let track = LocalAudioTrack::create();
1219 this.upgrade(&cx)
1220 .ok_or_else(|| anyhow!("room was dropped"))?
1221 .read_with(&cx, |this, _| {
1222 this.live_kit
1223 .as_ref()
1224 .map(|live_kit| live_kit.room.publish_audio_track(&track))
1225 })
1226 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1227 .await
1228 };
1229
1230 let publication = publish_track.await;
1231 this.upgrade(&cx)
1232 .ok_or_else(|| anyhow!("room was dropped"))?
1233 .update(&mut cx, |this, cx| {
1234 let live_kit = this
1235 .live_kit
1236 .as_mut()
1237 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1238
1239 let (canceled, muted) = if let LocalTrack::Pending {
1240 publish_id: cur_publish_id,
1241 muted,
1242 } = &live_kit.microphone_track
1243 {
1244 (*cur_publish_id != publish_id, *muted)
1245 } else {
1246 (true, false)
1247 };
1248
1249 match publication {
1250 Ok(publication) => {
1251 if canceled {
1252 live_kit.room.unpublish_track(publication);
1253 } else {
1254 if muted {
1255 cx.background().spawn(publication.set_mute(muted)).detach();
1256 }
1257 live_kit.microphone_track = LocalTrack::Published {
1258 track_publication: publication,
1259 muted,
1260 };
1261 cx.notify();
1262 }
1263 Ok(())
1264 }
1265 Err(error) => {
1266 if canceled {
1267 Ok(())
1268 } else {
1269 live_kit.microphone_track = LocalTrack::None;
1270 cx.notify();
1271 Err(error)
1272 }
1273 }
1274 }
1275 })
1276 })
1277 }
1278
1279 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1280 if self.status.is_offline() {
1281 return Task::ready(Err(anyhow!("room is offline")));
1282 } else if self.is_screen_sharing() {
1283 return Task::ready(Err(anyhow!("screen was already shared")));
1284 }
1285
1286 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1287 let publish_id = post_inc(&mut live_kit.next_publish_id);
1288 live_kit.screen_track = LocalTrack::Pending {
1289 publish_id,
1290 muted: false,
1291 };
1292 cx.notify();
1293 (live_kit.room.display_sources(), publish_id)
1294 } else {
1295 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1296 };
1297
1298 cx.spawn_weak(|this, mut cx| async move {
1299 let publish_track = async {
1300 let displays = displays.await?;
1301 let display = displays
1302 .first()
1303 .ok_or_else(|| anyhow!("no display found"))?;
1304 let track = LocalVideoTrack::screen_share_for_display(&display);
1305 this.upgrade(&cx)
1306 .ok_or_else(|| anyhow!("room was dropped"))?
1307 .read_with(&cx, |this, _| {
1308 this.live_kit
1309 .as_ref()
1310 .map(|live_kit| live_kit.room.publish_video_track(&track))
1311 })
1312 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1313 .await
1314 };
1315
1316 let publication = publish_track.await;
1317 this.upgrade(&cx)
1318 .ok_or_else(|| anyhow!("room was dropped"))?
1319 .update(&mut cx, |this, cx| {
1320 let live_kit = this
1321 .live_kit
1322 .as_mut()
1323 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1324
1325 let (canceled, muted) = if let LocalTrack::Pending {
1326 publish_id: cur_publish_id,
1327 muted,
1328 } = &live_kit.screen_track
1329 {
1330 (*cur_publish_id != publish_id, *muted)
1331 } else {
1332 (true, false)
1333 };
1334
1335 match publication {
1336 Ok(publication) => {
1337 if canceled {
1338 live_kit.room.unpublish_track(publication);
1339 } else {
1340 if muted {
1341 cx.background().spawn(publication.set_mute(muted)).detach();
1342 }
1343 live_kit.screen_track = LocalTrack::Published {
1344 track_publication: publication,
1345 muted,
1346 };
1347 cx.notify();
1348 }
1349
1350 Audio::play_sound(Sound::StartScreenshare, cx);
1351
1352 Ok(())
1353 }
1354 Err(error) => {
1355 if canceled {
1356 Ok(())
1357 } else {
1358 live_kit.screen_track = LocalTrack::None;
1359 cx.notify();
1360 Err(error)
1361 }
1362 }
1363 }
1364 })
1365 })
1366 }
1367
1368 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1369 let should_mute = !self.is_muted(cx);
1370 if let Some(live_kit) = self.live_kit.as_mut() {
1371 if matches!(live_kit.microphone_track, LocalTrack::None) {
1372 return Ok(self.share_microphone(cx));
1373 }
1374
1375 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1376 live_kit.muted_by_user = should_mute;
1377
1378 if old_muted == true && live_kit.deafened == true {
1379 if let Some(task) = self.toggle_deafen(cx).ok() {
1380 task.detach();
1381 }
1382 }
1383
1384 Ok(ret_task)
1385 } else {
1386 Err(anyhow!("LiveKit not started"))
1387 }
1388 }
1389
1390 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1391 if let Some(live_kit) = self.live_kit.as_mut() {
1392 (*live_kit).deafened = !live_kit.deafened;
1393
1394 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1395 // Context notification is sent within set_mute itself.
1396 let mut mute_task = None;
1397 // When deafening, mute user's mic as well.
1398 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1399 if live_kit.deafened || !live_kit.muted_by_user {
1400 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1401 };
1402 for participant in self.remote_participants.values() {
1403 for track in live_kit
1404 .room
1405 .remote_audio_track_publications(&participant.user.id.to_string())
1406 {
1407 tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1408 }
1409 }
1410
1411 Ok(cx.foreground().spawn(async move {
1412 if let Some(mute_task) = mute_task {
1413 mute_task.await?;
1414 }
1415 for task in tasks {
1416 task.await?;
1417 }
1418 Ok(())
1419 }))
1420 } else {
1421 Err(anyhow!("LiveKit not started"))
1422 }
1423 }
1424
1425 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1426 if self.status.is_offline() {
1427 return Err(anyhow!("room is offline"));
1428 }
1429
1430 let live_kit = self
1431 .live_kit
1432 .as_mut()
1433 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1434 match mem::take(&mut live_kit.screen_track) {
1435 LocalTrack::None => Err(anyhow!("screen was not shared")),
1436 LocalTrack::Pending { .. } => {
1437 cx.notify();
1438 Ok(())
1439 }
1440 LocalTrack::Published {
1441 track_publication, ..
1442 } => {
1443 live_kit.room.unpublish_track(track_publication);
1444 cx.notify();
1445
1446 Audio::play_sound(Sound::StopScreenshare, cx);
1447 Ok(())
1448 }
1449 }
1450 }
1451
1452 #[cfg(any(test, feature = "test-support"))]
1453 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1454 self.live_kit
1455 .as_ref()
1456 .unwrap()
1457 .room
1458 .set_display_sources(sources);
1459 }
1460}
1461
1462struct LiveKitRoom {
1463 room: Arc<live_kit_client::Room>,
1464 screen_track: LocalTrack,
1465 microphone_track: LocalTrack,
1466 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1467 muted_by_user: bool,
1468 deafened: bool,
1469 speaking: bool,
1470 next_publish_id: usize,
1471 _maintain_room: Task<()>,
1472 _maintain_tracks: [Task<()>; 2],
1473}
1474
1475impl LiveKitRoom {
1476 fn set_mute(
1477 self: &mut LiveKitRoom,
1478 should_mute: bool,
1479 cx: &mut ModelContext<Room>,
1480 ) -> Result<(Task<Result<()>>, bool)> {
1481 if !should_mute {
1482 // clear user muting state.
1483 self.muted_by_user = false;
1484 }
1485
1486 let (result, old_muted) = match &mut self.microphone_track {
1487 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1488 LocalTrack::Pending { muted, .. } => {
1489 let old_muted = *muted;
1490 *muted = should_mute;
1491 cx.notify();
1492 Ok((Task::Ready(Some(Ok(()))), old_muted))
1493 }
1494 LocalTrack::Published {
1495 track_publication,
1496 muted,
1497 } => {
1498 let old_muted = *muted;
1499 *muted = should_mute;
1500 cx.notify();
1501 Ok((
1502 cx.background().spawn(track_publication.set_mute(*muted)),
1503 old_muted,
1504 ))
1505 }
1506 }?;
1507
1508 if old_muted != should_mute {
1509 if should_mute {
1510 Audio::play_sound(Sound::Mute, cx);
1511 } else {
1512 Audio::play_sound(Sound::Unmute, cx);
1513 }
1514 }
1515
1516 Ok((result, old_muted))
1517 }
1518}
1519
1520enum LocalTrack {
1521 None,
1522 Pending {
1523 publish_id: usize,
1524 muted: bool,
1525 },
1526 Published {
1527 track_publication: LocalTrackPublication,
1528 muted: bool,
1529 },
1530}
1531
1532impl Default for LocalTrack {
1533 fn default() -> Self {
1534 Self::None
1535 }
1536}
1537
1538#[derive(Copy, Clone, PartialEq, Eq)]
1539pub enum RoomStatus {
1540 Online,
1541 Rejoining,
1542 Offline,
1543}
1544
1545impl RoomStatus {
1546 pub fn is_offline(&self) -> bool {
1547 matches!(self, RoomStatus::Offline)
1548 }
1549
1550 pub fn is_online(&self) -> bool {
1551 matches!(self, RoomStatus::Online)
1552 }
1553}