1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
4 IncomingCall,
5};
6use anyhow::{anyhow, Result};
7use audio::{Audio, Sound};
8use client::{
9 proto::{self, PeerId},
10 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
11};
12use collections::{BTreeMap, HashMap, HashSet};
13use fs::Fs;
14use futures::{FutureExt, StreamExt};
15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
16use language::LanguageRegistry;
17use live_kit_client::{
18 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
19 RemoteVideoTrackUpdate,
20};
21use postage::stream::Stream;
22use project::Project;
23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
24use util::{post_inc, ResultExt, TryFutureExt};
25
26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
27
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum Event {
30 ParticipantLocationChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteVideoTracksChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteAudioTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteProjectShared {
40 owner: Arc<User>,
41 project_id: u64,
42 worktree_root_names: Vec<String>,
43 },
44 RemoteProjectUnshared {
45 project_id: u64,
46 },
47 Left,
48}
49
50pub struct Room {
51 id: u64,
52 channel_id: Option<u64>,
53 live_kit: Option<LiveKitRoom>,
54 status: RoomStatus,
55 shared_projects: HashSet<WeakModelHandle<Project>>,
56 joined_projects: HashSet<WeakModelHandle<Project>>,
57 local_participant: LocalParticipant,
58 remote_participants: BTreeMap<u64, RemoteParticipant>,
59 pending_participants: Vec<Arc<User>>,
60 participant_user_ids: HashSet<u64>,
61 pending_call_count: usize,
62 leave_when_empty: bool,
63 client: Arc<Client>,
64 user_store: ModelHandle<UserStore>,
65 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
66 subscriptions: Vec<client::Subscription>,
67 pending_room_update: Option<Task<()>>,
68 maintain_connection: Option<Task<Option<()>>>,
69}
70
71impl Entity for Room {
72 type Event = Event;
73
74 fn release(&mut self, cx: &mut AppContext) {
75 if self.status.is_online() {
76 self.leave_internal(cx).detach_and_log_err(cx);
77 }
78 }
79
80 fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
81 if self.status.is_online() {
82 let leave = self.leave_internal(cx);
83 Some(
84 cx.background()
85 .spawn(async move {
86 leave.await.log_err();
87 })
88 .boxed(),
89 )
90 } else {
91 None
92 }
93 }
94}
95
96impl Room {
97 pub fn channel_id(&self) -> Option<u64> {
98 self.channel_id
99 }
100
101 #[cfg(any(test, feature = "test-support"))]
102 pub fn is_connected(&self) -> bool {
103 if let Some(live_kit) = self.live_kit.as_ref() {
104 matches!(
105 *live_kit.room.status().borrow(),
106 live_kit_client::ConnectionState::Connected { .. }
107 )
108 } else {
109 false
110 }
111 }
112
113 fn new(
114 id: u64,
115 channel_id: Option<u64>,
116 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
117 client: Arc<Client>,
118 user_store: ModelHandle<UserStore>,
119 cx: &mut ModelContext<Self>,
120 ) -> Self {
121 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
122 let room = live_kit_client::Room::new();
123 let mut status = room.status();
124 // Consume the initial status of the room.
125 let _ = status.try_recv();
126 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
127 while let Some(status) = status.next().await {
128 let this = if let Some(this) = this.upgrade(&cx) {
129 this
130 } else {
131 break;
132 };
133
134 if status == live_kit_client::ConnectionState::Disconnected {
135 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
136 break;
137 }
138 }
139 });
140
141 let mut track_video_changes = room.remote_video_track_updates();
142 let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
143 while let Some(track_change) = track_video_changes.next().await {
144 let this = if let Some(this) = this.upgrade(&cx) {
145 this
146 } else {
147 break;
148 };
149
150 this.update(&mut cx, |this, cx| {
151 this.remote_video_track_updated(track_change, cx).log_err()
152 });
153 }
154 });
155
156 let mut track_audio_changes = room.remote_audio_track_updates();
157 let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
158 while let Some(track_change) = track_audio_changes.next().await {
159 let this = if let Some(this) = this.upgrade(&cx) {
160 this
161 } else {
162 break;
163 };
164
165 this.update(&mut cx, |this, cx| {
166 this.remote_audio_track_updated(track_change, cx).log_err()
167 });
168 }
169 });
170
171 let connect = room.connect(&connection_info.server_url, &connection_info.token);
172 cx.spawn(|this, mut cx| async move {
173 connect.await?;
174
175 if !cx.read(Self::mute_on_join) {
176 this.update(&mut cx, |this, cx| this.share_microphone(cx))
177 .await?;
178 }
179
180 anyhow::Ok(())
181 })
182 .detach_and_log_err(cx);
183
184 Some(LiveKitRoom {
185 room,
186 screen_track: LocalTrack::None,
187 microphone_track: LocalTrack::None,
188 next_publish_id: 0,
189 muted_by_user: false,
190 deafened: false,
191 speaking: false,
192 _maintain_room,
193 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
194 })
195 } else {
196 None
197 };
198
199 let maintain_connection =
200 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
201
202 Audio::play_sound(Sound::Joined, cx);
203
204 Self {
205 id,
206 channel_id,
207 live_kit: live_kit_room,
208 status: RoomStatus::Online,
209 shared_projects: Default::default(),
210 joined_projects: Default::default(),
211 participant_user_ids: Default::default(),
212 local_participant: Default::default(),
213 remote_participants: Default::default(),
214 pending_participants: Default::default(),
215 pending_call_count: 0,
216 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
217 leave_when_empty: false,
218 pending_room_update: None,
219 client,
220 user_store,
221 follows_by_leader_id_project_id: Default::default(),
222 maintain_connection: Some(maintain_connection),
223 }
224 }
225
226 pub(crate) fn create(
227 called_user_id: u64,
228 initial_project: Option<ModelHandle<Project>>,
229 client: Arc<Client>,
230 user_store: ModelHandle<UserStore>,
231 cx: &mut AppContext,
232 ) -> Task<Result<ModelHandle<Self>>> {
233 cx.spawn(|mut cx| async move {
234 let response = client.request(proto::CreateRoom {}).await?;
235 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
236 let room = cx.add_model(|cx| {
237 Self::new(
238 room_proto.id,
239 None,
240 response.live_kit_connection_info,
241 client,
242 user_store,
243 cx,
244 )
245 });
246
247 let initial_project_id = if let Some(initial_project) = initial_project {
248 let initial_project_id = room
249 .update(&mut cx, |room, cx| {
250 room.share_project(initial_project.clone(), cx)
251 })
252 .await?;
253 Some(initial_project_id)
254 } else {
255 None
256 };
257
258 match room
259 .update(&mut cx, |room, cx| {
260 room.leave_when_empty = true;
261 room.call(called_user_id, initial_project_id, cx)
262 })
263 .await
264 {
265 Ok(()) => Ok(room),
266 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
267 }
268 })
269 }
270
271 pub(crate) fn join_channel(
272 channel_id: u64,
273 client: Arc<Client>,
274 user_store: ModelHandle<UserStore>,
275 cx: &mut AppContext,
276 ) -> Task<Result<ModelHandle<Self>>> {
277 cx.spawn(|cx| async move {
278 Self::from_join_response(
279 client.request(proto::JoinChannel { channel_id }).await?,
280 client,
281 user_store,
282 cx,
283 )
284 })
285 }
286
287 pub(crate) fn join(
288 call: &IncomingCall,
289 client: Arc<Client>,
290 user_store: ModelHandle<UserStore>,
291 cx: &mut AppContext,
292 ) -> Task<Result<ModelHandle<Self>>> {
293 let id = call.room_id;
294 cx.spawn(|cx| async move {
295 Self::from_join_response(
296 client.request(proto::JoinRoom { id }).await?,
297 client,
298 user_store,
299 cx,
300 )
301 })
302 }
303
304 pub fn mute_on_join(cx: &AppContext) -> bool {
305 settings::get::<CallSettings>(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
306 }
307
308 fn from_join_response(
309 response: proto::JoinRoomResponse,
310 client: Arc<Client>,
311 user_store: ModelHandle<UserStore>,
312 mut cx: AsyncAppContext,
313 ) -> Result<ModelHandle<Self>> {
314 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
315 let room = cx.add_model(|cx| {
316 Self::new(
317 room_proto.id,
318 response.channel_id,
319 response.live_kit_connection_info,
320 client,
321 user_store,
322 cx,
323 )
324 });
325 room.update(&mut cx, |room, cx| {
326 room.leave_when_empty = room.channel_id.is_none();
327 room.apply_room_update(room_proto, cx)?;
328 anyhow::Ok(())
329 })?;
330 Ok(room)
331 }
332
333 fn should_leave(&self) -> bool {
334 self.leave_when_empty
335 && self.pending_room_update.is_none()
336 && self.pending_participants.is_empty()
337 && self.remote_participants.is_empty()
338 && self.pending_call_count == 0
339 }
340
341 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
342 cx.notify();
343 cx.emit(Event::Left);
344 self.leave_internal(cx)
345 }
346
347 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
348 if self.status.is_offline() {
349 return Task::ready(Err(anyhow!("room is offline")));
350 }
351
352 log::info!("leaving room");
353 Audio::play_sound(Sound::Leave, cx);
354
355 self.clear_state(cx);
356
357 let leave_room = self.client.request(proto::LeaveRoom {});
358 cx.background().spawn(async move {
359 leave_room.await?;
360 anyhow::Ok(())
361 })
362 }
363
364 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
365 for project in self.shared_projects.drain() {
366 if let Some(project) = project.upgrade(cx) {
367 project.update(cx, |project, cx| {
368 project.unshare(cx).log_err();
369 });
370 }
371 }
372 for project in self.joined_projects.drain() {
373 if let Some(project) = project.upgrade(cx) {
374 project.update(cx, |project, cx| {
375 project.disconnected_from_host(cx);
376 project.close(cx);
377 });
378 }
379 }
380
381 self.status = RoomStatus::Offline;
382 self.remote_participants.clear();
383 self.pending_participants.clear();
384 self.participant_user_ids.clear();
385 self.subscriptions.clear();
386 self.live_kit.take();
387 self.pending_room_update.take();
388 self.maintain_connection.take();
389 }
390
391 async fn maintain_connection(
392 this: WeakModelHandle<Self>,
393 client: Arc<Client>,
394 mut cx: AsyncAppContext,
395 ) -> Result<()> {
396 let mut client_status = client.status();
397 loop {
398 let _ = client_status.try_recv();
399 let is_connected = client_status.borrow().is_connected();
400 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
401 if !is_connected || client_status.next().await.is_some() {
402 log::info!("detected client disconnection");
403
404 this.upgrade(&cx)
405 .ok_or_else(|| anyhow!("room was dropped"))?
406 .update(&mut cx, |this, cx| {
407 this.status = RoomStatus::Rejoining;
408 cx.notify();
409 });
410
411 // Wait for client to re-establish a connection to the server.
412 {
413 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
414 let client_reconnection = async {
415 let mut remaining_attempts = 3;
416 while remaining_attempts > 0 {
417 if client_status.borrow().is_connected() {
418 log::info!("client reconnected, attempting to rejoin room");
419
420 let Some(this) = this.upgrade(&cx) else { break };
421 if this
422 .update(&mut cx, |this, cx| this.rejoin(cx))
423 .await
424 .log_err()
425 .is_some()
426 {
427 return true;
428 } else {
429 remaining_attempts -= 1;
430 }
431 } else if client_status.borrow().is_signed_out() {
432 return false;
433 }
434
435 log::info!(
436 "waiting for client status change, remaining attempts {}",
437 remaining_attempts
438 );
439 client_status.next().await;
440 }
441 false
442 }
443 .fuse();
444 futures::pin_mut!(client_reconnection);
445
446 futures::select_biased! {
447 reconnected = client_reconnection => {
448 if reconnected {
449 log::info!("successfully reconnected to room");
450 // If we successfully joined the room, go back around the loop
451 // waiting for future connection status changes.
452 continue;
453 }
454 }
455 _ = reconnection_timeout => {
456 log::info!("room reconnection timeout expired");
457 }
458 }
459 }
460
461 break;
462 }
463 }
464
465 // The client failed to re-establish a connection to the server
466 // or an error occurred while trying to re-join the room. Either way
467 // we leave the room and return an error.
468 if let Some(this) = this.upgrade(&cx) {
469 log::info!("reconnection failed, leaving room");
470 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
471 }
472 Err(anyhow!(
473 "can't reconnect to room: client failed to re-establish connection"
474 ))
475 }
476
477 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
478 let mut projects = HashMap::default();
479 let mut reshared_projects = Vec::new();
480 let mut rejoined_projects = Vec::new();
481 self.shared_projects.retain(|project| {
482 if let Some(handle) = project.upgrade(cx) {
483 let project = handle.read(cx);
484 if let Some(project_id) = project.remote_id() {
485 projects.insert(project_id, handle.clone());
486 reshared_projects.push(proto::UpdateProject {
487 project_id,
488 worktrees: project.worktree_metadata_protos(cx),
489 });
490 return true;
491 }
492 }
493 false
494 });
495 self.joined_projects.retain(|project| {
496 if let Some(handle) = project.upgrade(cx) {
497 let project = handle.read(cx);
498 if let Some(project_id) = project.remote_id() {
499 projects.insert(project_id, handle.clone());
500 rejoined_projects.push(proto::RejoinProject {
501 id: project_id,
502 worktrees: project
503 .worktrees(cx)
504 .map(|worktree| {
505 let worktree = worktree.read(cx);
506 proto::RejoinWorktree {
507 id: worktree.id().to_proto(),
508 scan_id: worktree.completed_scan_id() as u64,
509 }
510 })
511 .collect(),
512 });
513 }
514 return true;
515 }
516 false
517 });
518
519 let response = self.client.request_envelope(proto::RejoinRoom {
520 id: self.id,
521 reshared_projects,
522 rejoined_projects,
523 });
524
525 cx.spawn(|this, mut cx| async move {
526 let response = response.await?;
527 let message_id = response.message_id;
528 let response = response.payload;
529 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
530 this.update(&mut cx, |this, cx| {
531 this.status = RoomStatus::Online;
532 this.apply_room_update(room_proto, cx)?;
533
534 for reshared_project in response.reshared_projects {
535 if let Some(project) = projects.get(&reshared_project.id) {
536 project.update(cx, |project, cx| {
537 project.reshared(reshared_project, cx).log_err();
538 });
539 }
540 }
541
542 for rejoined_project in response.rejoined_projects {
543 if let Some(project) = projects.get(&rejoined_project.id) {
544 project.update(cx, |project, cx| {
545 project.rejoined(rejoined_project, message_id, cx).log_err();
546 });
547 }
548 }
549
550 anyhow::Ok(())
551 })
552 })
553 }
554
555 pub fn id(&self) -> u64 {
556 self.id
557 }
558
559 pub fn status(&self) -> RoomStatus {
560 self.status
561 }
562
563 pub fn local_participant(&self) -> &LocalParticipant {
564 &self.local_participant
565 }
566
567 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
568 &self.remote_participants
569 }
570
571 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
572 self.remote_participants
573 .values()
574 .find(|p| p.peer_id == peer_id)
575 }
576
577 pub fn pending_participants(&self) -> &[Arc<User>] {
578 &self.pending_participants
579 }
580
581 pub fn contains_participant(&self, user_id: u64) -> bool {
582 self.participant_user_ids.contains(&user_id)
583 }
584
585 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
586 self.follows_by_leader_id_project_id
587 .get(&(leader_id, project_id))
588 .map_or(&[], |v| v.as_slice())
589 }
590
591 async fn handle_room_updated(
592 this: ModelHandle<Self>,
593 envelope: TypedEnvelope<proto::RoomUpdated>,
594 _: Arc<Client>,
595 mut cx: AsyncAppContext,
596 ) -> Result<()> {
597 let room = envelope
598 .payload
599 .room
600 .ok_or_else(|| anyhow!("invalid room"))?;
601 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
602 }
603
604 fn apply_room_update(
605 &mut self,
606 mut room: proto::Room,
607 cx: &mut ModelContext<Self>,
608 ) -> Result<()> {
609 // Filter ourselves out from the room's participants.
610 let local_participant_ix = room
611 .participants
612 .iter()
613 .position(|participant| Some(participant.user_id) == self.client.user_id());
614 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
615
616 let pending_participant_user_ids = room
617 .pending_participants
618 .iter()
619 .map(|p| p.user_id)
620 .collect::<Vec<_>>();
621
622 let remote_participant_user_ids = room
623 .participants
624 .iter()
625 .map(|p| p.user_id)
626 .collect::<Vec<_>>();
627
628 let (remote_participants, pending_participants) =
629 self.user_store.update(cx, move |user_store, cx| {
630 (
631 user_store.get_users(remote_participant_user_ids, cx),
632 user_store.get_users(pending_participant_user_ids, cx),
633 )
634 });
635
636 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
637 let (remote_participants, pending_participants) =
638 futures::join!(remote_participants, pending_participants);
639
640 this.update(&mut cx, |this, cx| {
641 this.participant_user_ids.clear();
642
643 if let Some(participant) = local_participant {
644 this.local_participant.projects = participant.projects;
645 } else {
646 this.local_participant.projects.clear();
647 }
648
649 if let Some(participants) = remote_participants.log_err() {
650 for (participant, user) in room.participants.into_iter().zip(participants) {
651 let Some(peer_id) = participant.peer_id else {
652 continue;
653 };
654 this.participant_user_ids.insert(participant.user_id);
655
656 let old_projects = this
657 .remote_participants
658 .get(&participant.user_id)
659 .into_iter()
660 .flat_map(|existing| &existing.projects)
661 .map(|project| project.id)
662 .collect::<HashSet<_>>();
663 let new_projects = participant
664 .projects
665 .iter()
666 .map(|project| project.id)
667 .collect::<HashSet<_>>();
668
669 for project in &participant.projects {
670 if !old_projects.contains(&project.id) {
671 cx.emit(Event::RemoteProjectShared {
672 owner: user.clone(),
673 project_id: project.id,
674 worktree_root_names: project.worktree_root_names.clone(),
675 });
676 }
677 }
678
679 for unshared_project_id in old_projects.difference(&new_projects) {
680 this.joined_projects.retain(|project| {
681 if let Some(project) = project.upgrade(cx) {
682 project.update(cx, |project, cx| {
683 if project.remote_id() == Some(*unshared_project_id) {
684 project.disconnected_from_host(cx);
685 false
686 } else {
687 true
688 }
689 })
690 } else {
691 false
692 }
693 });
694 cx.emit(Event::RemoteProjectUnshared {
695 project_id: *unshared_project_id,
696 });
697 }
698
699 let location = ParticipantLocation::from_proto(participant.location)
700 .unwrap_or(ParticipantLocation::External);
701 if let Some(remote_participant) =
702 this.remote_participants.get_mut(&participant.user_id)
703 {
704 remote_participant.projects = participant.projects;
705 remote_participant.peer_id = peer_id;
706 if location != remote_participant.location {
707 remote_participant.location = location;
708 cx.emit(Event::ParticipantLocationChanged {
709 participant_id: peer_id,
710 });
711 }
712 } else {
713 this.remote_participants.insert(
714 participant.user_id,
715 RemoteParticipant {
716 user: user.clone(),
717 participant_index: ParticipantIndex(
718 participant.participant_index,
719 ),
720 peer_id,
721 projects: participant.projects,
722 location,
723 muted: true,
724 speaking: false,
725 video_tracks: Default::default(),
726 audio_tracks: Default::default(),
727 },
728 );
729
730 Audio::play_sound(Sound::Joined, cx);
731
732 if let Some(live_kit) = this.live_kit.as_ref() {
733 let video_tracks =
734 live_kit.room.remote_video_tracks(&user.id.to_string());
735 let audio_tracks =
736 live_kit.room.remote_audio_tracks(&user.id.to_string());
737 let publications = live_kit
738 .room
739 .remote_audio_track_publications(&user.id.to_string());
740
741 for track in video_tracks {
742 this.remote_video_track_updated(
743 RemoteVideoTrackUpdate::Subscribed(track),
744 cx,
745 )
746 .log_err();
747 }
748
749 for (track, publication) in
750 audio_tracks.iter().zip(publications.iter())
751 {
752 this.remote_audio_track_updated(
753 RemoteAudioTrackUpdate::Subscribed(
754 track.clone(),
755 publication.clone(),
756 ),
757 cx,
758 )
759 .log_err();
760 }
761 }
762 }
763 }
764
765 this.remote_participants.retain(|user_id, participant| {
766 if this.participant_user_ids.contains(user_id) {
767 true
768 } else {
769 for project in &participant.projects {
770 cx.emit(Event::RemoteProjectUnshared {
771 project_id: project.id,
772 });
773 }
774 false
775 }
776 });
777 }
778
779 if let Some(pending_participants) = pending_participants.log_err() {
780 this.pending_participants = pending_participants;
781 for participant in &this.pending_participants {
782 this.participant_user_ids.insert(participant.id);
783 }
784 }
785
786 this.follows_by_leader_id_project_id.clear();
787 for follower in room.followers {
788 let project_id = follower.project_id;
789 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
790 (Some(leader), Some(follower)) => (leader, follower),
791
792 _ => {
793 log::error!("Follower message {follower:?} missing some state");
794 continue;
795 }
796 };
797
798 let list = this
799 .follows_by_leader_id_project_id
800 .entry((leader, project_id))
801 .or_insert(Vec::new());
802 if !list.contains(&follower) {
803 list.push(follower);
804 }
805 }
806
807 this.pending_room_update.take();
808 if this.should_leave() {
809 log::info!("room is empty, leaving");
810 let _ = this.leave(cx);
811 }
812
813 this.user_store.update(cx, |user_store, cx| {
814 let participant_indices_by_user_id = this
815 .remote_participants
816 .iter()
817 .map(|(user_id, participant)| (*user_id, participant.participant_index))
818 .collect();
819 user_store.set_participant_indices(participant_indices_by_user_id, cx);
820 });
821
822 this.check_invariants();
823 cx.notify();
824 });
825 }));
826
827 cx.notify();
828 Ok(())
829 }
830
831 fn remote_video_track_updated(
832 &mut self,
833 change: RemoteVideoTrackUpdate,
834 cx: &mut ModelContext<Self>,
835 ) -> Result<()> {
836 match change {
837 RemoteVideoTrackUpdate::Subscribed(track) => {
838 let user_id = track.publisher_id().parse()?;
839 let track_id = track.sid().to_string();
840 let participant = self
841 .remote_participants
842 .get_mut(&user_id)
843 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
844 participant.video_tracks.insert(
845 track_id.clone(),
846 Arc::new(RemoteVideoTrack {
847 live_kit_track: track,
848 }),
849 );
850 cx.emit(Event::RemoteVideoTracksChanged {
851 participant_id: participant.peer_id,
852 });
853 }
854 RemoteVideoTrackUpdate::Unsubscribed {
855 publisher_id,
856 track_id,
857 } => {
858 let user_id = publisher_id.parse()?;
859 let participant = self
860 .remote_participants
861 .get_mut(&user_id)
862 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
863 participant.video_tracks.remove(&track_id);
864 cx.emit(Event::RemoteVideoTracksChanged {
865 participant_id: participant.peer_id,
866 });
867 }
868 }
869
870 cx.notify();
871 Ok(())
872 }
873
874 fn remote_audio_track_updated(
875 &mut self,
876 change: RemoteAudioTrackUpdate,
877 cx: &mut ModelContext<Self>,
878 ) -> Result<()> {
879 match change {
880 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
881 let mut speaker_ids = speakers
882 .into_iter()
883 .filter_map(|speaker_sid| speaker_sid.parse().ok())
884 .collect::<Vec<u64>>();
885 speaker_ids.sort_unstable();
886 for (sid, participant) in &mut self.remote_participants {
887 if let Ok(_) = speaker_ids.binary_search(sid) {
888 participant.speaking = true;
889 } else {
890 participant.speaking = false;
891 }
892 }
893 if let Some(id) = self.client.user_id() {
894 if let Some(room) = &mut self.live_kit {
895 if let Ok(_) = speaker_ids.binary_search(&id) {
896 room.speaking = true;
897 } else {
898 room.speaking = false;
899 }
900 }
901 }
902 cx.notify();
903 }
904 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
905 let mut found = false;
906 for participant in &mut self.remote_participants.values_mut() {
907 for track in participant.audio_tracks.values() {
908 if track.sid() == track_id {
909 found = true;
910 break;
911 }
912 }
913 if found {
914 participant.muted = muted;
915 break;
916 }
917 }
918
919 cx.notify();
920 }
921 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
922 let user_id = track.publisher_id().parse()?;
923 let track_id = track.sid().to_string();
924 let participant = self
925 .remote_participants
926 .get_mut(&user_id)
927 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
928
929 participant.audio_tracks.insert(track_id.clone(), track);
930 participant.muted = publication.is_muted();
931
932 cx.emit(Event::RemoteAudioTracksChanged {
933 participant_id: participant.peer_id,
934 });
935 }
936 RemoteAudioTrackUpdate::Unsubscribed {
937 publisher_id,
938 track_id,
939 } => {
940 let user_id = publisher_id.parse()?;
941 let participant = self
942 .remote_participants
943 .get_mut(&user_id)
944 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
945 participant.audio_tracks.remove(&track_id);
946 cx.emit(Event::RemoteAudioTracksChanged {
947 participant_id: participant.peer_id,
948 });
949 }
950 }
951
952 cx.notify();
953 Ok(())
954 }
955
956 fn check_invariants(&self) {
957 #[cfg(any(test, feature = "test-support"))]
958 {
959 for participant in self.remote_participants.values() {
960 assert!(self.participant_user_ids.contains(&participant.user.id));
961 assert_ne!(participant.user.id, self.client.user_id().unwrap());
962 }
963
964 for participant in &self.pending_participants {
965 assert!(self.participant_user_ids.contains(&participant.id));
966 assert_ne!(participant.id, self.client.user_id().unwrap());
967 }
968
969 assert_eq!(
970 self.participant_user_ids.len(),
971 self.remote_participants.len() + self.pending_participants.len()
972 );
973 }
974 }
975
976 pub(crate) fn call(
977 &mut self,
978 called_user_id: u64,
979 initial_project_id: Option<u64>,
980 cx: &mut ModelContext<Self>,
981 ) -> Task<Result<()>> {
982 if self.status.is_offline() {
983 return Task::ready(Err(anyhow!("room is offline")));
984 }
985
986 cx.notify();
987 let client = self.client.clone();
988 let room_id = self.id;
989 self.pending_call_count += 1;
990 cx.spawn(|this, mut cx| async move {
991 let result = client
992 .request(proto::Call {
993 room_id,
994 called_user_id,
995 initial_project_id,
996 })
997 .await;
998 this.update(&mut cx, |this, cx| {
999 this.pending_call_count -= 1;
1000 if this.should_leave() {
1001 this.leave(cx).detach_and_log_err(cx);
1002 }
1003 });
1004 result?;
1005 Ok(())
1006 })
1007 }
1008
1009 pub fn join_project(
1010 &mut self,
1011 id: u64,
1012 language_registry: Arc<LanguageRegistry>,
1013 fs: Arc<dyn Fs>,
1014 cx: &mut ModelContext<Self>,
1015 ) -> Task<Result<ModelHandle<Project>>> {
1016 let client = self.client.clone();
1017 let user_store = self.user_store.clone();
1018 cx.spawn(|this, mut cx| async move {
1019 let project =
1020 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1021
1022 this.update(&mut cx, |this, cx| {
1023 this.joined_projects.retain(|project| {
1024 if let Some(project) = project.upgrade(cx) {
1025 !project.read(cx).is_read_only()
1026 } else {
1027 false
1028 }
1029 });
1030 this.joined_projects.insert(project.downgrade());
1031 });
1032 Ok(project)
1033 })
1034 }
1035
1036 pub(crate) fn share_project(
1037 &mut self,
1038 project: ModelHandle<Project>,
1039 cx: &mut ModelContext<Self>,
1040 ) -> Task<Result<u64>> {
1041 if let Some(project_id) = project.read(cx).remote_id() {
1042 return Task::ready(Ok(project_id));
1043 }
1044
1045 let request = self.client.request(proto::ShareProject {
1046 room_id: self.id(),
1047 worktrees: project.read(cx).worktree_metadata_protos(cx),
1048 });
1049 cx.spawn(|this, mut cx| async move {
1050 let response = request.await?;
1051
1052 project.update(&mut cx, |project, cx| {
1053 project.shared(response.project_id, cx)
1054 })?;
1055
1056 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1057 this.update(&mut cx, |this, cx| {
1058 this.shared_projects.insert(project.downgrade());
1059 let active_project = this.local_participant.active_project.as_ref();
1060 if active_project.map_or(false, |location| *location == project) {
1061 this.set_location(Some(&project), cx)
1062 } else {
1063 Task::ready(Ok(()))
1064 }
1065 })
1066 .await?;
1067
1068 Ok(response.project_id)
1069 })
1070 }
1071
1072 pub(crate) fn unshare_project(
1073 &mut self,
1074 project: ModelHandle<Project>,
1075 cx: &mut ModelContext<Self>,
1076 ) -> Result<()> {
1077 let project_id = match project.read(cx).remote_id() {
1078 Some(project_id) => project_id,
1079 None => return Ok(()),
1080 };
1081
1082 self.client.send(proto::UnshareProject { project_id })?;
1083 project.update(cx, |this, cx| this.unshare(cx))
1084 }
1085
1086 pub(crate) fn set_location(
1087 &mut self,
1088 project: Option<&ModelHandle<Project>>,
1089 cx: &mut ModelContext<Self>,
1090 ) -> Task<Result<()>> {
1091 if self.status.is_offline() {
1092 return Task::ready(Err(anyhow!("room is offline")));
1093 }
1094
1095 let client = self.client.clone();
1096 let room_id = self.id;
1097 let location = if let Some(project) = project {
1098 self.local_participant.active_project = Some(project.downgrade());
1099 if let Some(project_id) = project.read(cx).remote_id() {
1100 proto::participant_location::Variant::SharedProject(
1101 proto::participant_location::SharedProject { id: project_id },
1102 )
1103 } else {
1104 proto::participant_location::Variant::UnsharedProject(
1105 proto::participant_location::UnsharedProject {},
1106 )
1107 }
1108 } else {
1109 self.local_participant.active_project = None;
1110 proto::participant_location::Variant::External(proto::participant_location::External {})
1111 };
1112
1113 cx.notify();
1114 cx.foreground().spawn(async move {
1115 client
1116 .request(proto::UpdateParticipantLocation {
1117 room_id,
1118 location: Some(proto::ParticipantLocation {
1119 variant: Some(location),
1120 }),
1121 })
1122 .await?;
1123 Ok(())
1124 })
1125 }
1126
1127 pub fn is_screen_sharing(&self) -> bool {
1128 self.live_kit.as_ref().map_or(false, |live_kit| {
1129 !matches!(live_kit.screen_track, LocalTrack::None)
1130 })
1131 }
1132
1133 pub fn is_sharing_mic(&self) -> bool {
1134 self.live_kit.as_ref().map_or(false, |live_kit| {
1135 !matches!(live_kit.microphone_track, LocalTrack::None)
1136 })
1137 }
1138
1139 pub fn is_muted(&self, cx: &AppContext) -> bool {
1140 self.live_kit
1141 .as_ref()
1142 .and_then(|live_kit| match &live_kit.microphone_track {
1143 LocalTrack::None => Some(Self::mute_on_join(cx)),
1144 LocalTrack::Pending { muted, .. } => Some(*muted),
1145 LocalTrack::Published { muted, .. } => Some(*muted),
1146 })
1147 .unwrap_or(false)
1148 }
1149
1150 pub fn is_speaking(&self) -> bool {
1151 self.live_kit
1152 .as_ref()
1153 .map_or(false, |live_kit| live_kit.speaking)
1154 }
1155
1156 pub fn is_deafened(&self) -> Option<bool> {
1157 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1158 }
1159
1160 #[track_caller]
1161 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1162 if self.status.is_offline() {
1163 return Task::ready(Err(anyhow!("room is offline")));
1164 } else if self.is_sharing_mic() {
1165 return Task::ready(Err(anyhow!("microphone was already shared")));
1166 }
1167
1168 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1169 let publish_id = post_inc(&mut live_kit.next_publish_id);
1170 live_kit.microphone_track = LocalTrack::Pending {
1171 publish_id,
1172 muted: false,
1173 };
1174 cx.notify();
1175 publish_id
1176 } else {
1177 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1178 };
1179
1180 cx.spawn_weak(|this, mut cx| async move {
1181 let publish_track = async {
1182 let track = LocalAudioTrack::create();
1183 this.upgrade(&cx)
1184 .ok_or_else(|| anyhow!("room was dropped"))?
1185 .read_with(&cx, |this, _| {
1186 this.live_kit
1187 .as_ref()
1188 .map(|live_kit| live_kit.room.publish_audio_track(&track))
1189 })
1190 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1191 .await
1192 };
1193
1194 let publication = publish_track.await;
1195 this.upgrade(&cx)
1196 .ok_or_else(|| anyhow!("room was dropped"))?
1197 .update(&mut cx, |this, cx| {
1198 let live_kit = this
1199 .live_kit
1200 .as_mut()
1201 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1202
1203 let (canceled, muted) = if let LocalTrack::Pending {
1204 publish_id: cur_publish_id,
1205 muted,
1206 } = &live_kit.microphone_track
1207 {
1208 (*cur_publish_id != publish_id, *muted)
1209 } else {
1210 (true, false)
1211 };
1212
1213 match publication {
1214 Ok(publication) => {
1215 if canceled {
1216 live_kit.room.unpublish_track(publication);
1217 } else {
1218 if muted {
1219 cx.background().spawn(publication.set_mute(muted)).detach();
1220 }
1221 live_kit.microphone_track = LocalTrack::Published {
1222 track_publication: publication,
1223 muted,
1224 };
1225 cx.notify();
1226 }
1227 Ok(())
1228 }
1229 Err(error) => {
1230 if canceled {
1231 Ok(())
1232 } else {
1233 live_kit.microphone_track = LocalTrack::None;
1234 cx.notify();
1235 Err(error)
1236 }
1237 }
1238 }
1239 })
1240 })
1241 }
1242
1243 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1244 if self.status.is_offline() {
1245 return Task::ready(Err(anyhow!("room is offline")));
1246 } else if self.is_screen_sharing() {
1247 return Task::ready(Err(anyhow!("screen was already shared")));
1248 }
1249
1250 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1251 let publish_id = post_inc(&mut live_kit.next_publish_id);
1252 live_kit.screen_track = LocalTrack::Pending {
1253 publish_id,
1254 muted: false,
1255 };
1256 cx.notify();
1257 (live_kit.room.display_sources(), publish_id)
1258 } else {
1259 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1260 };
1261
1262 cx.spawn_weak(|this, mut cx| async move {
1263 let publish_track = async {
1264 let displays = displays.await?;
1265 let display = displays
1266 .first()
1267 .ok_or_else(|| anyhow!("no display found"))?;
1268 let track = LocalVideoTrack::screen_share_for_display(&display);
1269 this.upgrade(&cx)
1270 .ok_or_else(|| anyhow!("room was dropped"))?
1271 .read_with(&cx, |this, _| {
1272 this.live_kit
1273 .as_ref()
1274 .map(|live_kit| live_kit.room.publish_video_track(&track))
1275 })
1276 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1277 .await
1278 };
1279
1280 let publication = publish_track.await;
1281 this.upgrade(&cx)
1282 .ok_or_else(|| anyhow!("room was dropped"))?
1283 .update(&mut cx, |this, cx| {
1284 let live_kit = this
1285 .live_kit
1286 .as_mut()
1287 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1288
1289 let (canceled, muted) = if let LocalTrack::Pending {
1290 publish_id: cur_publish_id,
1291 muted,
1292 } = &live_kit.screen_track
1293 {
1294 (*cur_publish_id != publish_id, *muted)
1295 } else {
1296 (true, false)
1297 };
1298
1299 match publication {
1300 Ok(publication) => {
1301 if canceled {
1302 live_kit.room.unpublish_track(publication);
1303 } else {
1304 if muted {
1305 cx.background().spawn(publication.set_mute(muted)).detach();
1306 }
1307 live_kit.screen_track = LocalTrack::Published {
1308 track_publication: publication,
1309 muted,
1310 };
1311 cx.notify();
1312 }
1313
1314 Audio::play_sound(Sound::StartScreenshare, cx);
1315
1316 Ok(())
1317 }
1318 Err(error) => {
1319 if canceled {
1320 Ok(())
1321 } else {
1322 live_kit.screen_track = LocalTrack::None;
1323 cx.notify();
1324 Err(error)
1325 }
1326 }
1327 }
1328 })
1329 })
1330 }
1331
1332 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1333 let should_mute = !self.is_muted(cx);
1334 if let Some(live_kit) = self.live_kit.as_mut() {
1335 if matches!(live_kit.microphone_track, LocalTrack::None) {
1336 return Ok(self.share_microphone(cx));
1337 }
1338
1339 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1340 live_kit.muted_by_user = should_mute;
1341
1342 if old_muted == true && live_kit.deafened == true {
1343 if let Some(task) = self.toggle_deafen(cx).ok() {
1344 task.detach();
1345 }
1346 }
1347
1348 Ok(ret_task)
1349 } else {
1350 Err(anyhow!("LiveKit not started"))
1351 }
1352 }
1353
1354 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1355 if let Some(live_kit) = self.live_kit.as_mut() {
1356 (*live_kit).deafened = !live_kit.deafened;
1357
1358 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1359 // Context notification is sent within set_mute itself.
1360 let mut mute_task = None;
1361 // When deafening, mute user's mic as well.
1362 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1363 if live_kit.deafened || !live_kit.muted_by_user {
1364 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1365 };
1366 for participant in self.remote_participants.values() {
1367 for track in live_kit
1368 .room
1369 .remote_audio_track_publications(&participant.user.id.to_string())
1370 {
1371 tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1372 }
1373 }
1374
1375 Ok(cx.foreground().spawn(async move {
1376 if let Some(mute_task) = mute_task {
1377 mute_task.await?;
1378 }
1379 for task in tasks {
1380 task.await?;
1381 }
1382 Ok(())
1383 }))
1384 } else {
1385 Err(anyhow!("LiveKit not started"))
1386 }
1387 }
1388
1389 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1390 if self.status.is_offline() {
1391 return Err(anyhow!("room is offline"));
1392 }
1393
1394 let live_kit = self
1395 .live_kit
1396 .as_mut()
1397 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1398 match mem::take(&mut live_kit.screen_track) {
1399 LocalTrack::None => Err(anyhow!("screen was not shared")),
1400 LocalTrack::Pending { .. } => {
1401 cx.notify();
1402 Ok(())
1403 }
1404 LocalTrack::Published {
1405 track_publication, ..
1406 } => {
1407 live_kit.room.unpublish_track(track_publication);
1408 cx.notify();
1409
1410 Audio::play_sound(Sound::StopScreenshare, cx);
1411 Ok(())
1412 }
1413 }
1414 }
1415
1416 #[cfg(any(test, feature = "test-support"))]
1417 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1418 self.live_kit
1419 .as_ref()
1420 .unwrap()
1421 .room
1422 .set_display_sources(sources);
1423 }
1424}
1425
1426struct LiveKitRoom {
1427 room: Arc<live_kit_client::Room>,
1428 screen_track: LocalTrack,
1429 microphone_track: LocalTrack,
1430 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1431 muted_by_user: bool,
1432 deafened: bool,
1433 speaking: bool,
1434 next_publish_id: usize,
1435 _maintain_room: Task<()>,
1436 _maintain_tracks: [Task<()>; 2],
1437}
1438
1439impl LiveKitRoom {
1440 fn set_mute(
1441 self: &mut LiveKitRoom,
1442 should_mute: bool,
1443 cx: &mut ModelContext<Room>,
1444 ) -> Result<(Task<Result<()>>, bool)> {
1445 if !should_mute {
1446 // clear user muting state.
1447 self.muted_by_user = false;
1448 }
1449
1450 let (result, old_muted) = match &mut self.microphone_track {
1451 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1452 LocalTrack::Pending { muted, .. } => {
1453 let old_muted = *muted;
1454 *muted = should_mute;
1455 cx.notify();
1456 Ok((Task::Ready(Some(Ok(()))), old_muted))
1457 }
1458 LocalTrack::Published {
1459 track_publication,
1460 muted,
1461 } => {
1462 let old_muted = *muted;
1463 *muted = should_mute;
1464 cx.notify();
1465 Ok((
1466 cx.background().spawn(track_publication.set_mute(*muted)),
1467 old_muted,
1468 ))
1469 }
1470 }?;
1471
1472 if old_muted != should_mute {
1473 if should_mute {
1474 Audio::play_sound(Sound::Mute, cx);
1475 } else {
1476 Audio::play_sound(Sound::Unmute, cx);
1477 }
1478 }
1479
1480 Ok((result, old_muted))
1481 }
1482}
1483
1484enum LocalTrack {
1485 None,
1486 Pending {
1487 publish_id: usize,
1488 muted: bool,
1489 },
1490 Published {
1491 track_publication: LocalTrackPublication,
1492 muted: bool,
1493 },
1494}
1495
1496impl Default for LocalTrack {
1497 fn default() -> Self {
1498 Self::None
1499 }
1500}
1501
1502#[derive(Copy, Clone, PartialEq, Eq)]
1503pub enum RoomStatus {
1504 Online,
1505 Rejoining,
1506 Offline,
1507}
1508
1509impl RoomStatus {
1510 pub fn is_offline(&self) -> bool {
1511 matches!(self, RoomStatus::Offline)
1512 }
1513
1514 pub fn is_online(&self) -> bool {
1515 matches!(self, RoomStatus::Online)
1516 }
1517}