1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
4 IncomingCall,
5};
6use anyhow::{anyhow, Result};
7use audio::{Audio, Sound};
8use client::{
9 proto::{self, PeerId},
10 Client, TypedEnvelope, User, UserStore,
11};
12use collections::{BTreeMap, HashMap, HashSet};
13use fs::Fs;
14use futures::{FutureExt, StreamExt};
15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
16use language::LanguageRegistry;
17use live_kit_client::{
18 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
19 RemoteVideoTrackUpdate,
20};
21use postage::stream::Stream;
22use project::Project;
23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
24use util::{post_inc, ResultExt, TryFutureExt};
25
26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
27
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum Event {
30 ParticipantLocationChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteVideoTracksChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteAudioTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteProjectShared {
40 owner: Arc<User>,
41 project_id: u64,
42 worktree_root_names: Vec<String>,
43 },
44 RemoteProjectUnshared {
45 project_id: u64,
46 },
47 Left,
48}
49
50pub struct Room {
51 id: u64,
52 live_kit: Option<LiveKitRoom>,
53 status: RoomStatus,
54 shared_projects: HashSet<WeakModelHandle<Project>>,
55 joined_projects: HashSet<WeakModelHandle<Project>>,
56 local_participant: LocalParticipant,
57 remote_participants: BTreeMap<u64, RemoteParticipant>,
58 pending_participants: Vec<Arc<User>>,
59 participant_user_ids: HashSet<u64>,
60 pending_call_count: usize,
61 leave_when_empty: bool,
62 client: Arc<Client>,
63 user_store: ModelHandle<UserStore>,
64 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
65 subscriptions: Vec<client::Subscription>,
66 pending_room_update: Option<Task<()>>,
67 maintain_connection: Option<Task<Option<()>>>,
68}
69
70impl Entity for Room {
71 type Event = Event;
72
73 fn release(&mut self, cx: &mut AppContext) {
74 if self.status.is_online() {
75 self.leave_internal(cx).detach_and_log_err(cx);
76 }
77 }
78
79 fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
80 if self.status.is_online() {
81 let leave = self.leave_internal(cx);
82 Some(
83 cx.background()
84 .spawn(async move {
85 leave.await.log_err();
86 })
87 .boxed(),
88 )
89 } else {
90 None
91 }
92 }
93}
94
95impl Room {
96 fn new(
97 id: u64,
98 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
99 client: Arc<Client>,
100 user_store: ModelHandle<UserStore>,
101 cx: &mut ModelContext<Self>,
102 ) -> Self {
103 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
104 let room = live_kit_client::Room::new();
105 let mut status = room.status();
106 // Consume the initial status of the room.
107 let _ = status.try_recv();
108 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
109 while let Some(status) = status.next().await {
110 let this = if let Some(this) = this.upgrade(&cx) {
111 this
112 } else {
113 break;
114 };
115
116 if status == live_kit_client::ConnectionState::Disconnected {
117 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
118 break;
119 }
120 }
121 });
122
123 let mut track_video_changes = room.remote_video_track_updates();
124 let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
125 while let Some(track_change) = track_video_changes.next().await {
126 let this = if let Some(this) = this.upgrade(&cx) {
127 this
128 } else {
129 break;
130 };
131
132 this.update(&mut cx, |this, cx| {
133 this.remote_video_track_updated(track_change, cx).log_err()
134 });
135 }
136 });
137
138 let mut track_audio_changes = room.remote_audio_track_updates();
139 let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
140 while let Some(track_change) = track_audio_changes.next().await {
141 let this = if let Some(this) = this.upgrade(&cx) {
142 this
143 } else {
144 break;
145 };
146
147 this.update(&mut cx, |this, cx| {
148 this.remote_audio_track_updated(track_change, cx).log_err()
149 });
150 }
151 });
152
153 let connect = room.connect(&connection_info.server_url, &connection_info.token);
154 cx.spawn(|this, mut cx| async move {
155 connect.await?;
156
157 if !cx.read(|cx| settings::get::<CallSettings>(cx).mute_on_join) {
158 this.update(&mut cx, |this, cx| this.share_microphone(cx))
159 .await?;
160 }
161
162 anyhow::Ok(())
163 })
164 .detach_and_log_err(cx);
165
166 Some(LiveKitRoom {
167 room,
168 screen_track: LocalTrack::None,
169 microphone_track: LocalTrack::None,
170 next_publish_id: 0,
171 muted_by_user: false,
172 deafened: false,
173 speaking: false,
174 _maintain_room,
175 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
176 })
177 } else {
178 None
179 };
180
181 let maintain_connection =
182 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
183
184 Audio::play_sound(Sound::Joined, cx);
185
186 Self {
187 id,
188 live_kit: live_kit_room,
189 status: RoomStatus::Online,
190 shared_projects: Default::default(),
191 joined_projects: Default::default(),
192 participant_user_ids: Default::default(),
193 local_participant: Default::default(),
194 remote_participants: Default::default(),
195 pending_participants: Default::default(),
196 pending_call_count: 0,
197 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
198 leave_when_empty: false,
199 pending_room_update: None,
200 client,
201 user_store,
202 follows_by_leader_id_project_id: Default::default(),
203 maintain_connection: Some(maintain_connection),
204 }
205 }
206
207 pub(crate) fn create(
208 called_user_id: u64,
209 initial_project: Option<ModelHandle<Project>>,
210 client: Arc<Client>,
211 user_store: ModelHandle<UserStore>,
212 cx: &mut AppContext,
213 ) -> Task<Result<ModelHandle<Self>>> {
214 cx.spawn(|mut cx| async move {
215 let response = client.request(proto::CreateRoom {}).await?;
216 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
217 let room = cx.add_model(|cx| {
218 Self::new(
219 room_proto.id,
220 response.live_kit_connection_info,
221 client,
222 user_store,
223 cx,
224 )
225 });
226
227 let initial_project_id = if let Some(initial_project) = initial_project {
228 let initial_project_id = room
229 .update(&mut cx, |room, cx| {
230 room.share_project(initial_project.clone(), cx)
231 })
232 .await?;
233 Some(initial_project_id)
234 } else {
235 None
236 };
237
238 match room
239 .update(&mut cx, |room, cx| {
240 room.leave_when_empty = true;
241 room.call(called_user_id, initial_project_id, cx)
242 })
243 .await
244 {
245 Ok(()) => Ok(room),
246 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
247 }
248 })
249 }
250
251 pub(crate) fn join(
252 call: &IncomingCall,
253 client: Arc<Client>,
254 user_store: ModelHandle<UserStore>,
255 cx: &mut AppContext,
256 ) -> Task<Result<ModelHandle<Self>>> {
257 let room_id = call.room_id;
258 cx.spawn(|mut cx| async move {
259 let response = client.request(proto::JoinRoom { id: room_id }).await?;
260 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
261 let room = cx.add_model(|cx| {
262 Self::new(
263 room_id,
264 response.live_kit_connection_info,
265 client,
266 user_store,
267 cx,
268 )
269 });
270 room.update(&mut cx, |room, cx| {
271 room.leave_when_empty = true;
272 room.apply_room_update(room_proto, cx)?;
273 anyhow::Ok(())
274 })?;
275
276 Ok(room)
277 })
278 }
279
280 fn should_leave(&self) -> bool {
281 self.leave_when_empty
282 && self.pending_room_update.is_none()
283 && self.pending_participants.is_empty()
284 && self.remote_participants.is_empty()
285 && self.pending_call_count == 0
286 }
287
288 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
289 cx.notify();
290 cx.emit(Event::Left);
291 self.leave_internal(cx)
292 }
293
294 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
295 if self.status.is_offline() {
296 return Task::ready(Err(anyhow!("room is offline")));
297 }
298
299 log::info!("leaving room");
300
301 for project in self.shared_projects.drain() {
302 if let Some(project) = project.upgrade(cx) {
303 project.update(cx, |project, cx| {
304 project.unshare(cx).log_err();
305 });
306 }
307 }
308 for project in self.joined_projects.drain() {
309 if let Some(project) = project.upgrade(cx) {
310 project.update(cx, |project, cx| {
311 project.disconnected_from_host(cx);
312 project.close(cx);
313 });
314 }
315 }
316
317 Audio::play_sound(Sound::Leave, cx);
318
319 self.status = RoomStatus::Offline;
320 self.remote_participants.clear();
321 self.pending_participants.clear();
322 self.participant_user_ids.clear();
323 self.subscriptions.clear();
324 self.live_kit.take();
325 self.pending_room_update.take();
326 self.maintain_connection.take();
327
328 let leave_room = self.client.request(proto::LeaveRoom {});
329 cx.background().spawn(async move {
330 leave_room.await?;
331 anyhow::Ok(())
332 })
333 }
334
335 async fn maintain_connection(
336 this: WeakModelHandle<Self>,
337 client: Arc<Client>,
338 mut cx: AsyncAppContext,
339 ) -> Result<()> {
340 let mut client_status = client.status();
341 loop {
342 let _ = client_status.try_recv();
343 let is_connected = client_status.borrow().is_connected();
344 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
345 if !is_connected || client_status.next().await.is_some() {
346 log::info!("detected client disconnection");
347
348 this.upgrade(&cx)
349 .ok_or_else(|| anyhow!("room was dropped"))?
350 .update(&mut cx, |this, cx| {
351 this.status = RoomStatus::Rejoining;
352 cx.notify();
353 });
354
355 // Wait for client to re-establish a connection to the server.
356 {
357 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
358 let client_reconnection = async {
359 let mut remaining_attempts = 3;
360 while remaining_attempts > 0 {
361 if client_status.borrow().is_connected() {
362 log::info!("client reconnected, attempting to rejoin room");
363
364 let Some(this) = this.upgrade(&cx) else { break };
365 if this
366 .update(&mut cx, |this, cx| this.rejoin(cx))
367 .await
368 .log_err()
369 .is_some()
370 {
371 return true;
372 } else {
373 remaining_attempts -= 1;
374 }
375 } else if client_status.borrow().is_signed_out() {
376 return false;
377 }
378
379 log::info!(
380 "waiting for client status change, remaining attempts {}",
381 remaining_attempts
382 );
383 client_status.next().await;
384 }
385 false
386 }
387 .fuse();
388 futures::pin_mut!(client_reconnection);
389
390 futures::select_biased! {
391 reconnected = client_reconnection => {
392 if reconnected {
393 log::info!("successfully reconnected to room");
394 // If we successfully joined the room, go back around the loop
395 // waiting for future connection status changes.
396 continue;
397 }
398 }
399 _ = reconnection_timeout => {
400 log::info!("room reconnection timeout expired");
401 }
402 }
403 }
404
405 break;
406 }
407 }
408
409 // The client failed to re-establish a connection to the server
410 // or an error occurred while trying to re-join the room. Either way
411 // we leave the room and return an error.
412 if let Some(this) = this.upgrade(&cx) {
413 log::info!("reconnection failed, leaving room");
414 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
415 }
416 Err(anyhow!(
417 "can't reconnect to room: client failed to re-establish connection"
418 ))
419 }
420
421 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
422 let mut projects = HashMap::default();
423 let mut reshared_projects = Vec::new();
424 let mut rejoined_projects = Vec::new();
425 self.shared_projects.retain(|project| {
426 if let Some(handle) = project.upgrade(cx) {
427 let project = handle.read(cx);
428 if let Some(project_id) = project.remote_id() {
429 projects.insert(project_id, handle.clone());
430 reshared_projects.push(proto::UpdateProject {
431 project_id,
432 worktrees: project.worktree_metadata_protos(cx),
433 });
434 return true;
435 }
436 }
437 false
438 });
439 self.joined_projects.retain(|project| {
440 if let Some(handle) = project.upgrade(cx) {
441 let project = handle.read(cx);
442 if let Some(project_id) = project.remote_id() {
443 projects.insert(project_id, handle.clone());
444 rejoined_projects.push(proto::RejoinProject {
445 id: project_id,
446 worktrees: project
447 .worktrees(cx)
448 .map(|worktree| {
449 let worktree = worktree.read(cx);
450 proto::RejoinWorktree {
451 id: worktree.id().to_proto(),
452 scan_id: worktree.completed_scan_id() as u64,
453 }
454 })
455 .collect(),
456 });
457 }
458 return true;
459 }
460 false
461 });
462
463 let response = self.client.request_envelope(proto::RejoinRoom {
464 id: self.id,
465 reshared_projects,
466 rejoined_projects,
467 });
468
469 cx.spawn(|this, mut cx| async move {
470 let response = response.await?;
471 let message_id = response.message_id;
472 let response = response.payload;
473 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
474 this.update(&mut cx, |this, cx| {
475 this.status = RoomStatus::Online;
476 this.apply_room_update(room_proto, cx)?;
477
478 for reshared_project in response.reshared_projects {
479 if let Some(project) = projects.get(&reshared_project.id) {
480 project.update(cx, |project, cx| {
481 project.reshared(reshared_project, cx).log_err();
482 });
483 }
484 }
485
486 for rejoined_project in response.rejoined_projects {
487 if let Some(project) = projects.get(&rejoined_project.id) {
488 project.update(cx, |project, cx| {
489 project.rejoined(rejoined_project, message_id, cx).log_err();
490 });
491 }
492 }
493
494 anyhow::Ok(())
495 })
496 })
497 }
498
499 pub fn id(&self) -> u64 {
500 self.id
501 }
502
503 pub fn status(&self) -> RoomStatus {
504 self.status
505 }
506
507 pub fn local_participant(&self) -> &LocalParticipant {
508 &self.local_participant
509 }
510
511 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
512 &self.remote_participants
513 }
514
515 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
516 self.remote_participants
517 .values()
518 .find(|p| p.peer_id == peer_id)
519 }
520
521 pub fn pending_participants(&self) -> &[Arc<User>] {
522 &self.pending_participants
523 }
524
525 pub fn contains_participant(&self, user_id: u64) -> bool {
526 self.participant_user_ids.contains(&user_id)
527 }
528
529 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
530 self.follows_by_leader_id_project_id
531 .get(&(leader_id, project_id))
532 .map_or(&[], |v| v.as_slice())
533 }
534
535 async fn handle_room_updated(
536 this: ModelHandle<Self>,
537 envelope: TypedEnvelope<proto::RoomUpdated>,
538 _: Arc<Client>,
539 mut cx: AsyncAppContext,
540 ) -> Result<()> {
541 let room = envelope
542 .payload
543 .room
544 .ok_or_else(|| anyhow!("invalid room"))?;
545 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
546 }
547
548 fn apply_room_update(
549 &mut self,
550 mut room: proto::Room,
551 cx: &mut ModelContext<Self>,
552 ) -> Result<()> {
553 // Filter ourselves out from the room's participants.
554 let local_participant_ix = room
555 .participants
556 .iter()
557 .position(|participant| Some(participant.user_id) == self.client.user_id());
558 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
559
560 let pending_participant_user_ids = room
561 .pending_participants
562 .iter()
563 .map(|p| p.user_id)
564 .collect::<Vec<_>>();
565
566 let remote_participant_user_ids = room
567 .participants
568 .iter()
569 .map(|p| p.user_id)
570 .collect::<Vec<_>>();
571
572 let (remote_participants, pending_participants) =
573 self.user_store.update(cx, move |user_store, cx| {
574 (
575 user_store.get_users(remote_participant_user_ids, cx),
576 user_store.get_users(pending_participant_user_ids, cx),
577 )
578 });
579
580 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
581 let (remote_participants, pending_participants) =
582 futures::join!(remote_participants, pending_participants);
583
584 this.update(&mut cx, |this, cx| {
585 this.participant_user_ids.clear();
586
587 if let Some(participant) = local_participant {
588 this.local_participant.projects = participant.projects;
589 } else {
590 this.local_participant.projects.clear();
591 }
592
593 if let Some(participants) = remote_participants.log_err() {
594 for (participant, user) in room.participants.into_iter().zip(participants) {
595 let Some(peer_id) = participant.peer_id else { continue };
596 this.participant_user_ids.insert(participant.user_id);
597
598 let old_projects = this
599 .remote_participants
600 .get(&participant.user_id)
601 .into_iter()
602 .flat_map(|existing| &existing.projects)
603 .map(|project| project.id)
604 .collect::<HashSet<_>>();
605 let new_projects = participant
606 .projects
607 .iter()
608 .map(|project| project.id)
609 .collect::<HashSet<_>>();
610
611 for project in &participant.projects {
612 if !old_projects.contains(&project.id) {
613 cx.emit(Event::RemoteProjectShared {
614 owner: user.clone(),
615 project_id: project.id,
616 worktree_root_names: project.worktree_root_names.clone(),
617 });
618 }
619 }
620
621 for unshared_project_id in old_projects.difference(&new_projects) {
622 this.joined_projects.retain(|project| {
623 if let Some(project) = project.upgrade(cx) {
624 project.update(cx, |project, cx| {
625 if project.remote_id() == Some(*unshared_project_id) {
626 project.disconnected_from_host(cx);
627 false
628 } else {
629 true
630 }
631 })
632 } else {
633 false
634 }
635 });
636 cx.emit(Event::RemoteProjectUnshared {
637 project_id: *unshared_project_id,
638 });
639 }
640
641 let location = ParticipantLocation::from_proto(participant.location)
642 .unwrap_or(ParticipantLocation::External);
643 if let Some(remote_participant) =
644 this.remote_participants.get_mut(&participant.user_id)
645 {
646 remote_participant.projects = participant.projects;
647 remote_participant.peer_id = peer_id;
648 if location != remote_participant.location {
649 remote_participant.location = location;
650 cx.emit(Event::ParticipantLocationChanged {
651 participant_id: peer_id,
652 });
653 }
654 } else {
655 this.remote_participants.insert(
656 participant.user_id,
657 RemoteParticipant {
658 user: user.clone(),
659 peer_id,
660 projects: participant.projects,
661 location,
662 muted: true,
663 speaking: false,
664 video_tracks: Default::default(),
665 audio_tracks: Default::default(),
666 },
667 );
668
669 Audio::play_sound(Sound::Joined, cx);
670
671 if let Some(live_kit) = this.live_kit.as_ref() {
672 let video_tracks =
673 live_kit.room.remote_video_tracks(&user.id.to_string());
674 let audio_tracks =
675 live_kit.room.remote_audio_tracks(&user.id.to_string());
676 let publications = live_kit
677 .room
678 .remote_audio_track_publications(&user.id.to_string());
679
680 for track in video_tracks {
681 this.remote_video_track_updated(
682 RemoteVideoTrackUpdate::Subscribed(track),
683 cx,
684 )
685 .log_err();
686 }
687
688 for (track, publication) in
689 audio_tracks.iter().zip(publications.iter())
690 {
691 this.remote_audio_track_updated(
692 RemoteAudioTrackUpdate::Subscribed(
693 track.clone(),
694 publication.clone(),
695 ),
696 cx,
697 )
698 .log_err();
699 }
700 }
701 }
702 }
703
704 this.remote_participants.retain(|user_id, participant| {
705 if this.participant_user_ids.contains(user_id) {
706 true
707 } else {
708 for project in &participant.projects {
709 cx.emit(Event::RemoteProjectUnshared {
710 project_id: project.id,
711 });
712 }
713 false
714 }
715 });
716 }
717
718 if let Some(pending_participants) = pending_participants.log_err() {
719 this.pending_participants = pending_participants;
720 for participant in &this.pending_participants {
721 this.participant_user_ids.insert(participant.id);
722 }
723 }
724
725 this.follows_by_leader_id_project_id.clear();
726 for follower in room.followers {
727 let project_id = follower.project_id;
728 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
729 (Some(leader), Some(follower)) => (leader, follower),
730
731 _ => {
732 log::error!("Follower message {follower:?} missing some state");
733 continue;
734 }
735 };
736
737 let list = this
738 .follows_by_leader_id_project_id
739 .entry((leader, project_id))
740 .or_insert(Vec::new());
741 if !list.contains(&follower) {
742 list.push(follower);
743 }
744 }
745
746 this.pending_room_update.take();
747 if this.should_leave() {
748 log::info!("room is empty, leaving");
749 let _ = this.leave(cx);
750 }
751
752 this.check_invariants();
753 cx.notify();
754 });
755 }));
756
757 cx.notify();
758 Ok(())
759 }
760
761 fn remote_video_track_updated(
762 &mut self,
763 change: RemoteVideoTrackUpdate,
764 cx: &mut ModelContext<Self>,
765 ) -> Result<()> {
766 match change {
767 RemoteVideoTrackUpdate::Subscribed(track) => {
768 let user_id = track.publisher_id().parse()?;
769 let track_id = track.sid().to_string();
770 let participant = self
771 .remote_participants
772 .get_mut(&user_id)
773 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
774 participant.video_tracks.insert(
775 track_id.clone(),
776 Arc::new(RemoteVideoTrack {
777 live_kit_track: track,
778 }),
779 );
780 cx.emit(Event::RemoteVideoTracksChanged {
781 participant_id: participant.peer_id,
782 });
783 }
784 RemoteVideoTrackUpdate::Unsubscribed {
785 publisher_id,
786 track_id,
787 } => {
788 let user_id = publisher_id.parse()?;
789 let participant = self
790 .remote_participants
791 .get_mut(&user_id)
792 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
793 participant.video_tracks.remove(&track_id);
794 cx.emit(Event::RemoteVideoTracksChanged {
795 participant_id: participant.peer_id,
796 });
797 }
798 }
799
800 cx.notify();
801 Ok(())
802 }
803
804 fn remote_audio_track_updated(
805 &mut self,
806 change: RemoteAudioTrackUpdate,
807 cx: &mut ModelContext<Self>,
808 ) -> Result<()> {
809 match change {
810 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
811 let mut speaker_ids = speakers
812 .into_iter()
813 .filter_map(|speaker_sid| speaker_sid.parse().ok())
814 .collect::<Vec<u64>>();
815 speaker_ids.sort_unstable();
816 for (sid, participant) in &mut self.remote_participants {
817 if let Ok(_) = speaker_ids.binary_search(sid) {
818 participant.speaking = true;
819 } else {
820 participant.speaking = false;
821 }
822 }
823 if let Some(id) = self.client.user_id() {
824 if let Some(room) = &mut self.live_kit {
825 if let Ok(_) = speaker_ids.binary_search(&id) {
826 room.speaking = true;
827 } else {
828 room.speaking = false;
829 }
830 }
831 }
832 cx.notify();
833 }
834 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
835 let mut found = false;
836 for participant in &mut self.remote_participants.values_mut() {
837 for track in participant.audio_tracks.values() {
838 if track.sid() == track_id {
839 found = true;
840 break;
841 }
842 }
843 if found {
844 participant.muted = muted;
845 break;
846 }
847 }
848
849 cx.notify();
850 }
851 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
852 let user_id = track.publisher_id().parse()?;
853 let track_id = track.sid().to_string();
854 let participant = self
855 .remote_participants
856 .get_mut(&user_id)
857 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
858
859 participant.audio_tracks.insert(track_id.clone(), track);
860 participant.muted = publication.is_muted();
861
862 cx.emit(Event::RemoteAudioTracksChanged {
863 participant_id: participant.peer_id,
864 });
865 }
866 RemoteAudioTrackUpdate::Unsubscribed {
867 publisher_id,
868 track_id,
869 } => {
870 let user_id = publisher_id.parse()?;
871 let participant = self
872 .remote_participants
873 .get_mut(&user_id)
874 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
875 participant.audio_tracks.remove(&track_id);
876 cx.emit(Event::RemoteAudioTracksChanged {
877 participant_id: participant.peer_id,
878 });
879 }
880 }
881
882 cx.notify();
883 Ok(())
884 }
885
886 fn check_invariants(&self) {
887 #[cfg(any(test, feature = "test-support"))]
888 {
889 for participant in self.remote_participants.values() {
890 assert!(self.participant_user_ids.contains(&participant.user.id));
891 assert_ne!(participant.user.id, self.client.user_id().unwrap());
892 }
893
894 for participant in &self.pending_participants {
895 assert!(self.participant_user_ids.contains(&participant.id));
896 assert_ne!(participant.id, self.client.user_id().unwrap());
897 }
898
899 assert_eq!(
900 self.participant_user_ids.len(),
901 self.remote_participants.len() + self.pending_participants.len()
902 );
903 }
904 }
905
906 pub(crate) fn call(
907 &mut self,
908 called_user_id: u64,
909 initial_project_id: Option<u64>,
910 cx: &mut ModelContext<Self>,
911 ) -> Task<Result<()>> {
912 if self.status.is_offline() {
913 return Task::ready(Err(anyhow!("room is offline")));
914 }
915
916 cx.notify();
917 let client = self.client.clone();
918 let room_id = self.id;
919 self.pending_call_count += 1;
920 cx.spawn(|this, mut cx| async move {
921 let result = client
922 .request(proto::Call {
923 room_id,
924 called_user_id,
925 initial_project_id,
926 })
927 .await;
928 this.update(&mut cx, |this, cx| {
929 this.pending_call_count -= 1;
930 if this.should_leave() {
931 this.leave(cx).detach_and_log_err(cx);
932 }
933 });
934 result?;
935 Ok(())
936 })
937 }
938
939 pub fn join_project(
940 &mut self,
941 id: u64,
942 language_registry: Arc<LanguageRegistry>,
943 fs: Arc<dyn Fs>,
944 cx: &mut ModelContext<Self>,
945 ) -> Task<Result<ModelHandle<Project>>> {
946 let client = self.client.clone();
947 let user_store = self.user_store.clone();
948 cx.spawn(|this, mut cx| async move {
949 let project =
950 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
951
952 this.update(&mut cx, |this, cx| {
953 this.joined_projects.retain(|project| {
954 if let Some(project) = project.upgrade(cx) {
955 !project.read(cx).is_read_only()
956 } else {
957 false
958 }
959 });
960 this.joined_projects.insert(project.downgrade());
961 });
962 Ok(project)
963 })
964 }
965
966 pub(crate) fn share_project(
967 &mut self,
968 project: ModelHandle<Project>,
969 cx: &mut ModelContext<Self>,
970 ) -> Task<Result<u64>> {
971 if let Some(project_id) = project.read(cx).remote_id() {
972 return Task::ready(Ok(project_id));
973 }
974
975 let request = self.client.request(proto::ShareProject {
976 room_id: self.id(),
977 worktrees: project.read(cx).worktree_metadata_protos(cx),
978 });
979 cx.spawn(|this, mut cx| async move {
980 let response = request.await?;
981
982 project.update(&mut cx, |project, cx| {
983 project.shared(response.project_id, cx)
984 })?;
985
986 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
987 this.update(&mut cx, |this, cx| {
988 this.shared_projects.insert(project.downgrade());
989 let active_project = this.local_participant.active_project.as_ref();
990 if active_project.map_or(false, |location| *location == project) {
991 this.set_location(Some(&project), cx)
992 } else {
993 Task::ready(Ok(()))
994 }
995 })
996 .await?;
997
998 Ok(response.project_id)
999 })
1000 }
1001
1002 pub(crate) fn unshare_project(
1003 &mut self,
1004 project: ModelHandle<Project>,
1005 cx: &mut ModelContext<Self>,
1006 ) -> Result<()> {
1007 let project_id = match project.read(cx).remote_id() {
1008 Some(project_id) => project_id,
1009 None => return Ok(()),
1010 };
1011
1012 self.client.send(proto::UnshareProject { project_id })?;
1013 project.update(cx, |this, cx| this.unshare(cx))
1014 }
1015
1016 pub(crate) fn set_location(
1017 &mut self,
1018 project: Option<&ModelHandle<Project>>,
1019 cx: &mut ModelContext<Self>,
1020 ) -> Task<Result<()>> {
1021 if self.status.is_offline() {
1022 return Task::ready(Err(anyhow!("room is offline")));
1023 }
1024
1025 let client = self.client.clone();
1026 let room_id = self.id;
1027 let location = if let Some(project) = project {
1028 self.local_participant.active_project = Some(project.downgrade());
1029 if let Some(project_id) = project.read(cx).remote_id() {
1030 proto::participant_location::Variant::SharedProject(
1031 proto::participant_location::SharedProject { id: project_id },
1032 )
1033 } else {
1034 proto::participant_location::Variant::UnsharedProject(
1035 proto::participant_location::UnsharedProject {},
1036 )
1037 }
1038 } else {
1039 self.local_participant.active_project = None;
1040 proto::participant_location::Variant::External(proto::participant_location::External {})
1041 };
1042
1043 cx.notify();
1044 cx.foreground().spawn(async move {
1045 client
1046 .request(proto::UpdateParticipantLocation {
1047 room_id,
1048 location: Some(proto::ParticipantLocation {
1049 variant: Some(location),
1050 }),
1051 })
1052 .await?;
1053 Ok(())
1054 })
1055 }
1056
1057 pub fn is_screen_sharing(&self) -> bool {
1058 self.live_kit.as_ref().map_or(false, |live_kit| {
1059 !matches!(live_kit.screen_track, LocalTrack::None)
1060 })
1061 }
1062
1063 pub fn is_sharing_mic(&self) -> bool {
1064 self.live_kit.as_ref().map_or(false, |live_kit| {
1065 !matches!(live_kit.microphone_track, LocalTrack::None)
1066 })
1067 }
1068
1069 pub fn is_muted(&self) -> bool {
1070 self.live_kit
1071 .as_ref()
1072 .and_then(|live_kit| match &live_kit.microphone_track {
1073 LocalTrack::None => Some(true),
1074 LocalTrack::Pending { muted, .. } => Some(*muted),
1075 LocalTrack::Published { muted, .. } => Some(*muted),
1076 })
1077 .unwrap_or(false)
1078 }
1079
1080 pub fn is_speaking(&self) -> bool {
1081 self.live_kit
1082 .as_ref()
1083 .map_or(false, |live_kit| live_kit.speaking)
1084 }
1085
1086 pub fn is_deafened(&self) -> Option<bool> {
1087 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1088 }
1089
1090 #[track_caller]
1091 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1092 if self.status.is_offline() {
1093 return Task::ready(Err(anyhow!("room is offline")));
1094 } else if self.is_sharing_mic() {
1095 return Task::ready(Err(anyhow!("microphone was already shared")));
1096 }
1097
1098 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1099 let publish_id = post_inc(&mut live_kit.next_publish_id);
1100 live_kit.microphone_track = LocalTrack::Pending {
1101 publish_id,
1102 muted: false,
1103 };
1104 cx.notify();
1105 publish_id
1106 } else {
1107 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1108 };
1109
1110 cx.spawn_weak(|this, mut cx| async move {
1111 let publish_track = async {
1112 let track = LocalAudioTrack::create();
1113 this.upgrade(&cx)
1114 .ok_or_else(|| anyhow!("room was dropped"))?
1115 .read_with(&cx, |this, _| {
1116 this.live_kit
1117 .as_ref()
1118 .map(|live_kit| live_kit.room.publish_audio_track(&track))
1119 })
1120 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1121 .await
1122 };
1123
1124 let publication = publish_track.await;
1125 this.upgrade(&cx)
1126 .ok_or_else(|| anyhow!("room was dropped"))?
1127 .update(&mut cx, |this, cx| {
1128 let live_kit = this
1129 .live_kit
1130 .as_mut()
1131 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1132
1133 let (canceled, muted) = if let LocalTrack::Pending {
1134 publish_id: cur_publish_id,
1135 muted,
1136 } = &live_kit.microphone_track
1137 {
1138 (*cur_publish_id != publish_id, *muted)
1139 } else {
1140 (true, false)
1141 };
1142
1143 match publication {
1144 Ok(publication) => {
1145 if canceled {
1146 live_kit.room.unpublish_track(publication);
1147 } else {
1148 if muted {
1149 cx.background().spawn(publication.set_mute(muted)).detach();
1150 }
1151 live_kit.microphone_track = LocalTrack::Published {
1152 track_publication: publication,
1153 muted,
1154 };
1155 cx.notify();
1156 }
1157 Ok(())
1158 }
1159 Err(error) => {
1160 if canceled {
1161 Ok(())
1162 } else {
1163 live_kit.microphone_track = LocalTrack::None;
1164 cx.notify();
1165 Err(error)
1166 }
1167 }
1168 }
1169 })
1170 })
1171 }
1172
1173 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1174 if self.status.is_offline() {
1175 return Task::ready(Err(anyhow!("room is offline")));
1176 } else if self.is_screen_sharing() {
1177 return Task::ready(Err(anyhow!("screen was already shared")));
1178 }
1179
1180 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1181 let publish_id = post_inc(&mut live_kit.next_publish_id);
1182 live_kit.screen_track = LocalTrack::Pending {
1183 publish_id,
1184 muted: false,
1185 };
1186 cx.notify();
1187 (live_kit.room.display_sources(), publish_id)
1188 } else {
1189 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1190 };
1191
1192 cx.spawn_weak(|this, mut cx| async move {
1193 let publish_track = async {
1194 let displays = displays.await?;
1195 let display = displays
1196 .first()
1197 .ok_or_else(|| anyhow!("no display found"))?;
1198 let track = LocalVideoTrack::screen_share_for_display(&display);
1199 this.upgrade(&cx)
1200 .ok_or_else(|| anyhow!("room was dropped"))?
1201 .read_with(&cx, |this, _| {
1202 this.live_kit
1203 .as_ref()
1204 .map(|live_kit| live_kit.room.publish_video_track(&track))
1205 })
1206 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1207 .await
1208 };
1209
1210 let publication = publish_track.await;
1211 this.upgrade(&cx)
1212 .ok_or_else(|| anyhow!("room was dropped"))?
1213 .update(&mut cx, |this, cx| {
1214 let live_kit = this
1215 .live_kit
1216 .as_mut()
1217 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1218
1219 let (canceled, muted) = if let LocalTrack::Pending {
1220 publish_id: cur_publish_id,
1221 muted,
1222 } = &live_kit.screen_track
1223 {
1224 (*cur_publish_id != publish_id, *muted)
1225 } else {
1226 (true, false)
1227 };
1228
1229 match publication {
1230 Ok(publication) => {
1231 if canceled {
1232 live_kit.room.unpublish_track(publication);
1233 } else {
1234 if muted {
1235 cx.background().spawn(publication.set_mute(muted)).detach();
1236 }
1237 live_kit.screen_track = LocalTrack::Published {
1238 track_publication: publication,
1239 muted,
1240 };
1241 cx.notify();
1242 }
1243
1244 Audio::play_sound(Sound::StartScreenshare, cx);
1245
1246 Ok(())
1247 }
1248 Err(error) => {
1249 if canceled {
1250 Ok(())
1251 } else {
1252 live_kit.screen_track = LocalTrack::None;
1253 cx.notify();
1254 Err(error)
1255 }
1256 }
1257 }
1258 })
1259 })
1260 }
1261
1262 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1263 let should_mute = !self.is_muted();
1264 if let Some(live_kit) = self.live_kit.as_mut() {
1265 if matches!(live_kit.microphone_track, LocalTrack::None) {
1266 return Ok(self.share_microphone(cx));
1267 }
1268
1269 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1270 live_kit.muted_by_user = should_mute;
1271
1272 if old_muted == true && live_kit.deafened == true {
1273 if let Some(task) = self.toggle_deafen(cx).ok() {
1274 task.detach();
1275 }
1276 }
1277
1278 Ok(ret_task)
1279 } else {
1280 Err(anyhow!("LiveKit not started"))
1281 }
1282 }
1283
1284 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1285 if let Some(live_kit) = self.live_kit.as_mut() {
1286 (*live_kit).deafened = !live_kit.deafened;
1287
1288 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1289 // Context notification is sent within set_mute itself.
1290 let mut mute_task = None;
1291 // When deafening, mute user's mic as well.
1292 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1293 if live_kit.deafened || !live_kit.muted_by_user {
1294 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1295 };
1296 for participant in self.remote_participants.values() {
1297 for track in live_kit
1298 .room
1299 .remote_audio_track_publications(&participant.user.id.to_string())
1300 {
1301 tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1302 }
1303 }
1304
1305 Ok(cx.foreground().spawn(async move {
1306 if let Some(mute_task) = mute_task {
1307 mute_task.await?;
1308 }
1309 for task in tasks {
1310 task.await?;
1311 }
1312 Ok(())
1313 }))
1314 } else {
1315 Err(anyhow!("LiveKit not started"))
1316 }
1317 }
1318
1319 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1320 if self.status.is_offline() {
1321 return Err(anyhow!("room is offline"));
1322 }
1323
1324 let live_kit = self
1325 .live_kit
1326 .as_mut()
1327 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1328 match mem::take(&mut live_kit.screen_track) {
1329 LocalTrack::None => Err(anyhow!("screen was not shared")),
1330 LocalTrack::Pending { .. } => {
1331 cx.notify();
1332 Ok(())
1333 }
1334 LocalTrack::Published {
1335 track_publication, ..
1336 } => {
1337 live_kit.room.unpublish_track(track_publication);
1338 cx.notify();
1339
1340 Audio::play_sound(Sound::StopScreenshare, cx);
1341 Ok(())
1342 }
1343 }
1344 }
1345
1346 #[cfg(any(test, feature = "test-support"))]
1347 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1348 self.live_kit
1349 .as_ref()
1350 .unwrap()
1351 .room
1352 .set_display_sources(sources);
1353 }
1354}
1355
1356struct LiveKitRoom {
1357 room: Arc<live_kit_client::Room>,
1358 screen_track: LocalTrack,
1359 microphone_track: LocalTrack,
1360 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1361 muted_by_user: bool,
1362 deafened: bool,
1363 speaking: bool,
1364 next_publish_id: usize,
1365 _maintain_room: Task<()>,
1366 _maintain_tracks: [Task<()>; 2],
1367}
1368
1369impl LiveKitRoom {
1370 fn set_mute(
1371 self: &mut LiveKitRoom,
1372 should_mute: bool,
1373 cx: &mut ModelContext<Room>,
1374 ) -> Result<(Task<Result<()>>, bool)> {
1375 if !should_mute {
1376 // clear user muting state.
1377 self.muted_by_user = false;
1378 }
1379
1380 let (result, old_muted) = match &mut self.microphone_track {
1381 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1382 LocalTrack::Pending { muted, .. } => {
1383 let old_muted = *muted;
1384 *muted = should_mute;
1385 cx.notify();
1386 Ok((Task::Ready(Some(Ok(()))), old_muted))
1387 }
1388 LocalTrack::Published {
1389 track_publication,
1390 muted,
1391 } => {
1392 let old_muted = *muted;
1393 *muted = should_mute;
1394 cx.notify();
1395 Ok((
1396 cx.background().spawn(track_publication.set_mute(*muted)),
1397 old_muted,
1398 ))
1399 }
1400 }?;
1401
1402 if old_muted != should_mute {
1403 if should_mute {
1404 Audio::play_sound(Sound::Mute, cx);
1405 } else {
1406 Audio::play_sound(Sound::Unmute, cx);
1407 }
1408 }
1409
1410 Ok((result, old_muted))
1411 }
1412}
1413
1414enum LocalTrack {
1415 None,
1416 Pending {
1417 publish_id: usize,
1418 muted: bool,
1419 },
1420 Published {
1421 track_publication: LocalTrackPublication,
1422 muted: bool,
1423 },
1424}
1425
1426impl Default for LocalTrack {
1427 fn default() -> Self {
1428 Self::None
1429 }
1430}
1431
1432#[derive(Copy, Clone, PartialEq, Eq)]
1433pub enum RoomStatus {
1434 Online,
1435 Rejoining,
1436 Offline,
1437}
1438
1439impl RoomStatus {
1440 pub fn is_offline(&self) -> bool {
1441 matches!(self, RoomStatus::Offline)
1442 }
1443
1444 pub fn is_online(&self) -> bool {
1445 matches!(self, RoomStatus::Online)
1446 }
1447}