1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use audio::{Audio, Sound};
7use client::{
8 proto::{self, PeerId},
9 Client, TypedEnvelope, User, UserStore,
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use fs::Fs;
13use futures::{FutureExt, StreamExt};
14use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
15use language::LanguageRegistry;
16use live_kit_client::{
17 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
18 RemoteVideoTrackUpdate,
19};
20use postage::stream::Stream;
21use project::Project;
22use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
23use util::{post_inc, ResultExt, TryFutureExt};
24
25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
26
27#[derive(Clone, Debug, PartialEq, Eq)]
28pub enum Event {
29 ParticipantLocationChanged {
30 participant_id: proto::PeerId,
31 },
32 RemoteVideoTracksChanged {
33 participant_id: proto::PeerId,
34 },
35 RemoteAudioTracksChanged {
36 participant_id: proto::PeerId,
37 },
38 RemoteProjectShared {
39 owner: Arc<User>,
40 project_id: u64,
41 worktree_root_names: Vec<String>,
42 },
43 RemoteProjectUnshared {
44 project_id: u64,
45 },
46 Left,
47}
48
49pub struct Room {
50 id: u64,
51 live_kit: Option<LiveKitRoom>,
52 status: RoomStatus,
53 shared_projects: HashSet<WeakModelHandle<Project>>,
54 joined_projects: HashSet<WeakModelHandle<Project>>,
55 local_participant: LocalParticipant,
56 remote_participants: BTreeMap<u64, RemoteParticipant>,
57 pending_participants: Vec<Arc<User>>,
58 participant_user_ids: HashSet<u64>,
59 pending_call_count: usize,
60 leave_when_empty: bool,
61 client: Arc<Client>,
62 user_store: ModelHandle<UserStore>,
63 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
64 subscriptions: Vec<client::Subscription>,
65 pending_room_update: Option<Task<()>>,
66 maintain_connection: Option<Task<Option<()>>>,
67}
68
69impl Entity for Room {
70 type Event = Event;
71
72 fn release(&mut self, cx: &mut AppContext) {
73 if self.status.is_online() {
74 self.leave_internal(cx).detach_and_log_err(cx);
75 }
76 }
77
78 fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
79 if self.status.is_online() {
80 let leave = self.leave_internal(cx);
81 Some(
82 cx.background()
83 .spawn(async move {
84 leave.await.log_err();
85 })
86 .boxed(),
87 )
88 } else {
89 None
90 }
91 }
92}
93
94impl Room {
95 fn new(
96 id: u64,
97 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
98 client: Arc<Client>,
99 user_store: ModelHandle<UserStore>,
100 cx: &mut ModelContext<Self>,
101 ) -> Self {
102 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
103 let room = live_kit_client::Room::new();
104 let mut status = room.status();
105 // Consume the initial status of the room.
106 let _ = status.try_recv();
107 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
108 while let Some(status) = status.next().await {
109 let this = if let Some(this) = this.upgrade(&cx) {
110 this
111 } else {
112 break;
113 };
114
115 if status == live_kit_client::ConnectionState::Disconnected {
116 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
117 break;
118 }
119 }
120 });
121
122 let mut track_video_changes = room.remote_video_track_updates();
123 let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
124 while let Some(track_change) = track_video_changes.next().await {
125 let this = if let Some(this) = this.upgrade(&cx) {
126 this
127 } else {
128 break;
129 };
130
131 this.update(&mut cx, |this, cx| {
132 this.remote_video_track_updated(track_change, cx).log_err()
133 });
134 }
135 });
136
137 let mut track_audio_changes = room.remote_audio_track_updates();
138 let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
139 while let Some(track_change) = track_audio_changes.next().await {
140 let this = if let Some(this) = this.upgrade(&cx) {
141 this
142 } else {
143 break;
144 };
145
146 this.update(&mut cx, |this, cx| {
147 this.remote_audio_track_updated(track_change, cx).log_err()
148 });
149 }
150 });
151
152 let connect = room.connect(&connection_info.server_url, &connection_info.token);
153 cx.spawn(|this, mut cx| async move {
154 connect.await?;
155
156 this.update(&mut cx, |this, cx| this.share_microphone(cx))
157 .await?;
158
159 anyhow::Ok(())
160 })
161 .detach_and_log_err(cx);
162
163 Some(LiveKitRoom {
164 room,
165 screen_track: LocalTrack::None,
166 microphone_track: LocalTrack::None,
167 next_publish_id: 0,
168 muted_by_user: false,
169 deafened: false,
170 speaking: false,
171 _maintain_room,
172 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
173 })
174 } else {
175 None
176 };
177
178 let maintain_connection =
179 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
180
181 Audio::play_sound(Sound::Joined, cx);
182
183 Self {
184 id,
185 live_kit: live_kit_room,
186 status: RoomStatus::Online,
187 shared_projects: Default::default(),
188 joined_projects: Default::default(),
189 participant_user_ids: Default::default(),
190 local_participant: Default::default(),
191 remote_participants: Default::default(),
192 pending_participants: Default::default(),
193 pending_call_count: 0,
194 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
195 leave_when_empty: false,
196 pending_room_update: None,
197 client,
198 user_store,
199 follows_by_leader_id_project_id: Default::default(),
200 maintain_connection: Some(maintain_connection),
201 }
202 }
203
204 pub(crate) fn create(
205 called_user_id: u64,
206 initial_project: Option<ModelHandle<Project>>,
207 client: Arc<Client>,
208 user_store: ModelHandle<UserStore>,
209 cx: &mut AppContext,
210 ) -> Task<Result<ModelHandle<Self>>> {
211 cx.spawn(|mut cx| async move {
212 let response = client.request(proto::CreateRoom {}).await?;
213 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
214 let room = cx.add_model(|cx| {
215 Self::new(
216 room_proto.id,
217 response.live_kit_connection_info,
218 client,
219 user_store,
220 cx,
221 )
222 });
223
224 let initial_project_id = if let Some(initial_project) = initial_project {
225 let initial_project_id = room
226 .update(&mut cx, |room, cx| {
227 room.share_project(initial_project.clone(), cx)
228 })
229 .await?;
230 Some(initial_project_id)
231 } else {
232 None
233 };
234
235 match room
236 .update(&mut cx, |room, cx| {
237 room.leave_when_empty = true;
238 room.call(called_user_id, initial_project_id, cx)
239 })
240 .await
241 {
242 Ok(()) => Ok(room),
243 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
244 }
245 })
246 }
247
248 pub(crate) fn join(
249 call: &IncomingCall,
250 client: Arc<Client>,
251 user_store: ModelHandle<UserStore>,
252 cx: &mut AppContext,
253 ) -> Task<Result<ModelHandle<Self>>> {
254 let room_id = call.room_id;
255 cx.spawn(|mut cx| async move {
256 let response = client.request(proto::JoinRoom { id: room_id }).await?;
257 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
258 let room = cx.add_model(|cx| {
259 Self::new(
260 room_id,
261 response.live_kit_connection_info,
262 client,
263 user_store,
264 cx,
265 )
266 });
267 room.update(&mut cx, |room, cx| {
268 room.leave_when_empty = true;
269 room.apply_room_update(room_proto, cx)?;
270 anyhow::Ok(())
271 })?;
272
273 Ok(room)
274 })
275 }
276
277 fn should_leave(&self) -> bool {
278 self.leave_when_empty
279 && self.pending_room_update.is_none()
280 && self.pending_participants.is_empty()
281 && self.remote_participants.is_empty()
282 && self.pending_call_count == 0
283 }
284
285 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
286 cx.notify();
287 cx.emit(Event::Left);
288 self.leave_internal(cx)
289 }
290
291 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
292 if self.status.is_offline() {
293 return Task::ready(Err(anyhow!("room is offline")));
294 }
295
296 log::info!("leaving room");
297
298 for project in self.shared_projects.drain() {
299 if let Some(project) = project.upgrade(cx) {
300 project.update(cx, |project, cx| {
301 project.unshare(cx).log_err();
302 });
303 }
304 }
305 for project in self.joined_projects.drain() {
306 if let Some(project) = project.upgrade(cx) {
307 project.update(cx, |project, cx| {
308 project.disconnected_from_host(cx);
309 project.close(cx);
310 });
311 }
312 }
313
314 Audio::play_sound(Sound::Leave, cx);
315
316 self.status = RoomStatus::Offline;
317 self.remote_participants.clear();
318 self.pending_participants.clear();
319 self.participant_user_ids.clear();
320 self.subscriptions.clear();
321 self.live_kit.take();
322 self.pending_room_update.take();
323 self.maintain_connection.take();
324
325 let leave_room = self.client.request(proto::LeaveRoom {});
326 cx.background().spawn(async move {
327 leave_room.await?;
328 anyhow::Ok(())
329 })
330 }
331
332 async fn maintain_connection(
333 this: WeakModelHandle<Self>,
334 client: Arc<Client>,
335 mut cx: AsyncAppContext,
336 ) -> Result<()> {
337 let mut client_status = client.status();
338 loop {
339 let _ = client_status.try_recv();
340 let is_connected = client_status.borrow().is_connected();
341 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
342 if !is_connected || client_status.next().await.is_some() {
343 log::info!("detected client disconnection");
344
345 this.upgrade(&cx)
346 .ok_or_else(|| anyhow!("room was dropped"))?
347 .update(&mut cx, |this, cx| {
348 this.status = RoomStatus::Rejoining;
349 cx.notify();
350 });
351
352 // Wait for client to re-establish a connection to the server.
353 {
354 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
355 let client_reconnection = async {
356 let mut remaining_attempts = 3;
357 while remaining_attempts > 0 {
358 if client_status.borrow().is_connected() {
359 log::info!("client reconnected, attempting to rejoin room");
360
361 let Some(this) = this.upgrade(&cx) else { break };
362 if this
363 .update(&mut cx, |this, cx| this.rejoin(cx))
364 .await
365 .log_err()
366 .is_some()
367 {
368 return true;
369 } else {
370 remaining_attempts -= 1;
371 }
372 } else if client_status.borrow().is_signed_out() {
373 return false;
374 }
375
376 log::info!(
377 "waiting for client status change, remaining attempts {}",
378 remaining_attempts
379 );
380 client_status.next().await;
381 }
382 false
383 }
384 .fuse();
385 futures::pin_mut!(client_reconnection);
386
387 futures::select_biased! {
388 reconnected = client_reconnection => {
389 if reconnected {
390 log::info!("successfully reconnected to room");
391 // If we successfully joined the room, go back around the loop
392 // waiting for future connection status changes.
393 continue;
394 }
395 }
396 _ = reconnection_timeout => {
397 log::info!("room reconnection timeout expired");
398 }
399 }
400 }
401
402 break;
403 }
404 }
405
406 // The client failed to re-establish a connection to the server
407 // or an error occurred while trying to re-join the room. Either way
408 // we leave the room and return an error.
409 if let Some(this) = this.upgrade(&cx) {
410 log::info!("reconnection failed, leaving room");
411 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
412 }
413 Err(anyhow!(
414 "can't reconnect to room: client failed to re-establish connection"
415 ))
416 }
417
418 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
419 let mut projects = HashMap::default();
420 let mut reshared_projects = Vec::new();
421 let mut rejoined_projects = Vec::new();
422 self.shared_projects.retain(|project| {
423 if let Some(handle) = project.upgrade(cx) {
424 let project = handle.read(cx);
425 if let Some(project_id) = project.remote_id() {
426 projects.insert(project_id, handle.clone());
427 reshared_projects.push(proto::UpdateProject {
428 project_id,
429 worktrees: project.worktree_metadata_protos(cx),
430 });
431 return true;
432 }
433 }
434 false
435 });
436 self.joined_projects.retain(|project| {
437 if let Some(handle) = project.upgrade(cx) {
438 let project = handle.read(cx);
439 if let Some(project_id) = project.remote_id() {
440 projects.insert(project_id, handle.clone());
441 rejoined_projects.push(proto::RejoinProject {
442 id: project_id,
443 worktrees: project
444 .worktrees(cx)
445 .map(|worktree| {
446 let worktree = worktree.read(cx);
447 proto::RejoinWorktree {
448 id: worktree.id().to_proto(),
449 scan_id: worktree.completed_scan_id() as u64,
450 }
451 })
452 .collect(),
453 });
454 }
455 return true;
456 }
457 false
458 });
459
460 let response = self.client.request_envelope(proto::RejoinRoom {
461 id: self.id,
462 reshared_projects,
463 rejoined_projects,
464 });
465
466 cx.spawn(|this, mut cx| async move {
467 let response = response.await?;
468 let message_id = response.message_id;
469 let response = response.payload;
470 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
471 this.update(&mut cx, |this, cx| {
472 this.status = RoomStatus::Online;
473 this.apply_room_update(room_proto, cx)?;
474
475 for reshared_project in response.reshared_projects {
476 if let Some(project) = projects.get(&reshared_project.id) {
477 project.update(cx, |project, cx| {
478 project.reshared(reshared_project, cx).log_err();
479 });
480 }
481 }
482
483 for rejoined_project in response.rejoined_projects {
484 if let Some(project) = projects.get(&rejoined_project.id) {
485 project.update(cx, |project, cx| {
486 project.rejoined(rejoined_project, message_id, cx).log_err();
487 });
488 }
489 }
490
491 anyhow::Ok(())
492 })
493 })
494 }
495
496 pub fn id(&self) -> u64 {
497 self.id
498 }
499
500 pub fn status(&self) -> RoomStatus {
501 self.status
502 }
503
504 pub fn local_participant(&self) -> &LocalParticipant {
505 &self.local_participant
506 }
507
508 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
509 &self.remote_participants
510 }
511
512 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
513 self.remote_participants
514 .values()
515 .find(|p| p.peer_id == peer_id)
516 }
517
518 pub fn pending_participants(&self) -> &[Arc<User>] {
519 &self.pending_participants
520 }
521
522 pub fn contains_participant(&self, user_id: u64) -> bool {
523 self.participant_user_ids.contains(&user_id)
524 }
525
526 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
527 self.follows_by_leader_id_project_id
528 .get(&(leader_id, project_id))
529 .map_or(&[], |v| v.as_slice())
530 }
531
532 async fn handle_room_updated(
533 this: ModelHandle<Self>,
534 envelope: TypedEnvelope<proto::RoomUpdated>,
535 _: Arc<Client>,
536 mut cx: AsyncAppContext,
537 ) -> Result<()> {
538 let room = envelope
539 .payload
540 .room
541 .ok_or_else(|| anyhow!("invalid room"))?;
542 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
543 }
544
545 fn apply_room_update(
546 &mut self,
547 mut room: proto::Room,
548 cx: &mut ModelContext<Self>,
549 ) -> Result<()> {
550 // Filter ourselves out from the room's participants.
551 let local_participant_ix = room
552 .participants
553 .iter()
554 .position(|participant| Some(participant.user_id) == self.client.user_id());
555 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
556
557 let pending_participant_user_ids = room
558 .pending_participants
559 .iter()
560 .map(|p| p.user_id)
561 .collect::<Vec<_>>();
562
563 let remote_participant_user_ids = room
564 .participants
565 .iter()
566 .map(|p| p.user_id)
567 .collect::<Vec<_>>();
568
569 let (remote_participants, pending_participants) =
570 self.user_store.update(cx, move |user_store, cx| {
571 (
572 user_store.get_users(remote_participant_user_ids, cx),
573 user_store.get_users(pending_participant_user_ids, cx),
574 )
575 });
576
577 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
578 let (remote_participants, pending_participants) =
579 futures::join!(remote_participants, pending_participants);
580
581 this.update(&mut cx, |this, cx| {
582 this.participant_user_ids.clear();
583
584 if let Some(participant) = local_participant {
585 this.local_participant.projects = participant.projects;
586 } else {
587 this.local_participant.projects.clear();
588 }
589
590 if let Some(participants) = remote_participants.log_err() {
591 for (participant, user) in room.participants.into_iter().zip(participants) {
592 let Some(peer_id) = participant.peer_id else { continue };
593 this.participant_user_ids.insert(participant.user_id);
594
595 let old_projects = this
596 .remote_participants
597 .get(&participant.user_id)
598 .into_iter()
599 .flat_map(|existing| &existing.projects)
600 .map(|project| project.id)
601 .collect::<HashSet<_>>();
602 let new_projects = participant
603 .projects
604 .iter()
605 .map(|project| project.id)
606 .collect::<HashSet<_>>();
607
608 for project in &participant.projects {
609 if !old_projects.contains(&project.id) {
610 cx.emit(Event::RemoteProjectShared {
611 owner: user.clone(),
612 project_id: project.id,
613 worktree_root_names: project.worktree_root_names.clone(),
614 });
615 }
616 }
617
618 for unshared_project_id in old_projects.difference(&new_projects) {
619 this.joined_projects.retain(|project| {
620 if let Some(project) = project.upgrade(cx) {
621 project.update(cx, |project, cx| {
622 if project.remote_id() == Some(*unshared_project_id) {
623 project.disconnected_from_host(cx);
624 false
625 } else {
626 true
627 }
628 })
629 } else {
630 false
631 }
632 });
633 cx.emit(Event::RemoteProjectUnshared {
634 project_id: *unshared_project_id,
635 });
636 }
637
638 let location = ParticipantLocation::from_proto(participant.location)
639 .unwrap_or(ParticipantLocation::External);
640 if let Some(remote_participant) =
641 this.remote_participants.get_mut(&participant.user_id)
642 {
643 remote_participant.projects = participant.projects;
644 remote_participant.peer_id = peer_id;
645 if location != remote_participant.location {
646 remote_participant.location = location;
647 cx.emit(Event::ParticipantLocationChanged {
648 participant_id: peer_id,
649 });
650 }
651 } else {
652 this.remote_participants.insert(
653 participant.user_id,
654 RemoteParticipant {
655 user: user.clone(),
656 peer_id,
657 projects: participant.projects,
658 location,
659 muted: false,
660 speaking: false,
661 video_tracks: Default::default(),
662 audio_tracks: Default::default(),
663 },
664 );
665
666 Audio::play_sound(Sound::Joined, cx);
667
668 if let Some(live_kit) = this.live_kit.as_ref() {
669 let video_tracks =
670 live_kit.room.remote_video_tracks(&user.id.to_string());
671 let audio_tracks =
672 live_kit.room.remote_audio_tracks(&user.id.to_string());
673 for track in video_tracks {
674 this.remote_video_track_updated(
675 RemoteVideoTrackUpdate::Subscribed(track),
676 cx,
677 )
678 .log_err();
679 }
680 for track in audio_tracks {
681 this.remote_audio_track_updated(
682 RemoteAudioTrackUpdate::Subscribed(track),
683 cx,
684 )
685 .log_err();
686 }
687 }
688 }
689 }
690
691 this.remote_participants.retain(|user_id, participant| {
692 if this.participant_user_ids.contains(user_id) {
693 true
694 } else {
695 for project in &participant.projects {
696 cx.emit(Event::RemoteProjectUnshared {
697 project_id: project.id,
698 });
699 }
700 false
701 }
702 });
703 }
704
705 if let Some(pending_participants) = pending_participants.log_err() {
706 this.pending_participants = pending_participants;
707 for participant in &this.pending_participants {
708 this.participant_user_ids.insert(participant.id);
709 }
710 }
711
712 this.follows_by_leader_id_project_id.clear();
713 for follower in room.followers {
714 let project_id = follower.project_id;
715 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
716 (Some(leader), Some(follower)) => (leader, follower),
717
718 _ => {
719 log::error!("Follower message {follower:?} missing some state");
720 continue;
721 }
722 };
723
724 let list = this
725 .follows_by_leader_id_project_id
726 .entry((leader, project_id))
727 .or_insert(Vec::new());
728 if !list.contains(&follower) {
729 list.push(follower);
730 }
731 }
732
733 this.pending_room_update.take();
734 if this.should_leave() {
735 log::info!("room is empty, leaving");
736 let _ = this.leave(cx);
737 }
738
739 this.check_invariants();
740 cx.notify();
741 });
742 }));
743
744 cx.notify();
745 Ok(())
746 }
747
748 fn remote_video_track_updated(
749 &mut self,
750 change: RemoteVideoTrackUpdate,
751 cx: &mut ModelContext<Self>,
752 ) -> Result<()> {
753 match change {
754 RemoteVideoTrackUpdate::Subscribed(track) => {
755 let user_id = track.publisher_id().parse()?;
756 let track_id = track.sid().to_string();
757 let participant = self
758 .remote_participants
759 .get_mut(&user_id)
760 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
761 participant.video_tracks.insert(
762 track_id.clone(),
763 Arc::new(RemoteVideoTrack {
764 live_kit_track: track,
765 }),
766 );
767 cx.emit(Event::RemoteVideoTracksChanged {
768 participant_id: participant.peer_id,
769 });
770 }
771 RemoteVideoTrackUpdate::Unsubscribed {
772 publisher_id,
773 track_id,
774 } => {
775 let user_id = publisher_id.parse()?;
776 let participant = self
777 .remote_participants
778 .get_mut(&user_id)
779 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
780 participant.video_tracks.remove(&track_id);
781 cx.emit(Event::RemoteVideoTracksChanged {
782 participant_id: participant.peer_id,
783 });
784 }
785 }
786
787 cx.notify();
788 Ok(())
789 }
790
791 fn remote_audio_track_updated(
792 &mut self,
793 change: RemoteAudioTrackUpdate,
794 cx: &mut ModelContext<Self>,
795 ) -> Result<()> {
796 match change {
797 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
798 let mut speaker_ids = speakers
799 .into_iter()
800 .filter_map(|speaker_sid| speaker_sid.parse().ok())
801 .collect::<Vec<u64>>();
802 speaker_ids.sort_unstable();
803 for (sid, participant) in &mut self.remote_participants {
804 if let Ok(_) = speaker_ids.binary_search(sid) {
805 participant.speaking = true;
806 } else {
807 participant.speaking = false;
808 }
809 }
810 if let Some(id) = self.client.user_id() {
811 if let Some(room) = &mut self.live_kit {
812 if let Ok(_) = speaker_ids.binary_search(&id) {
813 room.speaking = true;
814 } else {
815 room.speaking = false;
816 }
817 }
818 }
819 cx.notify();
820 }
821 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
822 for participant in &mut self.remote_participants.values_mut() {
823 let mut found = false;
824 for track in participant.audio_tracks.values() {
825 if track.sid() == track_id {
826 found = true;
827 break;
828 }
829 }
830 if found {
831 participant.muted = muted;
832 break;
833 }
834 }
835 cx.notify();
836 }
837 RemoteAudioTrackUpdate::Subscribed(track) => {
838 let user_id = track.publisher_id().parse()?;
839 let track_id = track.sid().to_string();
840 let participant = self
841 .remote_participants
842 .get_mut(&user_id)
843 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
844 participant.audio_tracks.insert(track_id.clone(), track);
845 cx.emit(Event::RemoteAudioTracksChanged {
846 participant_id: participant.peer_id,
847 });
848 }
849 RemoteAudioTrackUpdate::Unsubscribed {
850 publisher_id,
851 track_id,
852 } => {
853 let user_id = publisher_id.parse()?;
854 let participant = self
855 .remote_participants
856 .get_mut(&user_id)
857 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
858 participant.audio_tracks.remove(&track_id);
859 cx.emit(Event::RemoteAudioTracksChanged {
860 participant_id: participant.peer_id,
861 });
862 }
863 }
864
865 cx.notify();
866 Ok(())
867 }
868
869 fn check_invariants(&self) {
870 #[cfg(any(test, feature = "test-support"))]
871 {
872 for participant in self.remote_participants.values() {
873 assert!(self.participant_user_ids.contains(&participant.user.id));
874 assert_ne!(participant.user.id, self.client.user_id().unwrap());
875 }
876
877 for participant in &self.pending_participants {
878 assert!(self.participant_user_ids.contains(&participant.id));
879 assert_ne!(participant.id, self.client.user_id().unwrap());
880 }
881
882 assert_eq!(
883 self.participant_user_ids.len(),
884 self.remote_participants.len() + self.pending_participants.len()
885 );
886 }
887 }
888
889 pub(crate) fn call(
890 &mut self,
891 called_user_id: u64,
892 initial_project_id: Option<u64>,
893 cx: &mut ModelContext<Self>,
894 ) -> Task<Result<()>> {
895 if self.status.is_offline() {
896 return Task::ready(Err(anyhow!("room is offline")));
897 }
898
899 cx.notify();
900 let client = self.client.clone();
901 let room_id = self.id;
902 self.pending_call_count += 1;
903 cx.spawn(|this, mut cx| async move {
904 let result = client
905 .request(proto::Call {
906 room_id,
907 called_user_id,
908 initial_project_id,
909 })
910 .await;
911 this.update(&mut cx, |this, cx| {
912 this.pending_call_count -= 1;
913 if this.should_leave() {
914 this.leave(cx).detach_and_log_err(cx);
915 }
916 });
917 result?;
918 Ok(())
919 })
920 }
921
922 pub fn join_project(
923 &mut self,
924 id: u64,
925 language_registry: Arc<LanguageRegistry>,
926 fs: Arc<dyn Fs>,
927 cx: &mut ModelContext<Self>,
928 ) -> Task<Result<ModelHandle<Project>>> {
929 let client = self.client.clone();
930 let user_store = self.user_store.clone();
931 cx.spawn(|this, mut cx| async move {
932 let project =
933 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
934
935 this.update(&mut cx, |this, cx| {
936 this.joined_projects.retain(|project| {
937 if let Some(project) = project.upgrade(cx) {
938 !project.read(cx).is_read_only()
939 } else {
940 false
941 }
942 });
943 this.joined_projects.insert(project.downgrade());
944 });
945 Ok(project)
946 })
947 }
948
949 pub(crate) fn share_project(
950 &mut self,
951 project: ModelHandle<Project>,
952 cx: &mut ModelContext<Self>,
953 ) -> Task<Result<u64>> {
954 if let Some(project_id) = project.read(cx).remote_id() {
955 return Task::ready(Ok(project_id));
956 }
957
958 let request = self.client.request(proto::ShareProject {
959 room_id: self.id(),
960 worktrees: project.read(cx).worktree_metadata_protos(cx),
961 });
962 cx.spawn(|this, mut cx| async move {
963 let response = request.await?;
964
965 project.update(&mut cx, |project, cx| {
966 project.shared(response.project_id, cx)
967 })?;
968
969 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
970 this.update(&mut cx, |this, cx| {
971 this.shared_projects.insert(project.downgrade());
972 let active_project = this.local_participant.active_project.as_ref();
973 if active_project.map_or(false, |location| *location == project) {
974 this.set_location(Some(&project), cx)
975 } else {
976 Task::ready(Ok(()))
977 }
978 })
979 .await?;
980
981 Ok(response.project_id)
982 })
983 }
984
985 pub(crate) fn unshare_project(
986 &mut self,
987 project: ModelHandle<Project>,
988 cx: &mut ModelContext<Self>,
989 ) -> Result<()> {
990 let project_id = match project.read(cx).remote_id() {
991 Some(project_id) => project_id,
992 None => return Ok(()),
993 };
994
995 self.client.send(proto::UnshareProject { project_id })?;
996 project.update(cx, |this, cx| this.unshare(cx))
997 }
998
999 pub(crate) fn set_location(
1000 &mut self,
1001 project: Option<&ModelHandle<Project>>,
1002 cx: &mut ModelContext<Self>,
1003 ) -> Task<Result<()>> {
1004 if self.status.is_offline() {
1005 return Task::ready(Err(anyhow!("room is offline")));
1006 }
1007
1008 let client = self.client.clone();
1009 let room_id = self.id;
1010 let location = if let Some(project) = project {
1011 self.local_participant.active_project = Some(project.downgrade());
1012 if let Some(project_id) = project.read(cx).remote_id() {
1013 proto::participant_location::Variant::SharedProject(
1014 proto::participant_location::SharedProject { id: project_id },
1015 )
1016 } else {
1017 proto::participant_location::Variant::UnsharedProject(
1018 proto::participant_location::UnsharedProject {},
1019 )
1020 }
1021 } else {
1022 self.local_participant.active_project = None;
1023 proto::participant_location::Variant::External(proto::participant_location::External {})
1024 };
1025
1026 cx.notify();
1027 cx.foreground().spawn(async move {
1028 client
1029 .request(proto::UpdateParticipantLocation {
1030 room_id,
1031 location: Some(proto::ParticipantLocation {
1032 variant: Some(location),
1033 }),
1034 })
1035 .await?;
1036 Ok(())
1037 })
1038 }
1039
1040 pub fn is_screen_sharing(&self) -> bool {
1041 self.live_kit.as_ref().map_or(false, |live_kit| {
1042 !matches!(live_kit.screen_track, LocalTrack::None)
1043 })
1044 }
1045
1046 pub fn is_sharing_mic(&self) -> bool {
1047 self.live_kit.as_ref().map_or(false, |live_kit| {
1048 !matches!(live_kit.microphone_track, LocalTrack::None)
1049 })
1050 }
1051
1052 pub fn is_muted(&self) -> bool {
1053 self.live_kit
1054 .as_ref()
1055 .and_then(|live_kit| match &live_kit.microphone_track {
1056 LocalTrack::None => None,
1057 LocalTrack::Pending { muted, .. } => Some(*muted),
1058 LocalTrack::Published { muted, .. } => Some(*muted),
1059 })
1060 .unwrap_or(false)
1061 }
1062
1063 pub fn is_speaking(&self) -> bool {
1064 self.live_kit
1065 .as_ref()
1066 .map_or(false, |live_kit| live_kit.speaking)
1067 }
1068
1069 pub fn is_deafened(&self) -> Option<bool> {
1070 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1071 }
1072
1073 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1074 if self.status.is_offline() {
1075 return Task::ready(Err(anyhow!("room is offline")));
1076 } else if self.is_sharing_mic() {
1077 return Task::ready(Err(anyhow!("microphone was already shared")));
1078 }
1079
1080 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1081 let publish_id = post_inc(&mut live_kit.next_publish_id);
1082 live_kit.microphone_track = LocalTrack::Pending {
1083 publish_id,
1084 muted: false,
1085 };
1086 cx.notify();
1087 publish_id
1088 } else {
1089 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1090 };
1091
1092 cx.spawn_weak(|this, mut cx| async move {
1093 let publish_track = async {
1094 let track = LocalAudioTrack::create();
1095 this.upgrade(&cx)
1096 .ok_or_else(|| anyhow!("room was dropped"))?
1097 .read_with(&cx, |this, _| {
1098 this.live_kit
1099 .as_ref()
1100 .map(|live_kit| live_kit.room.publish_audio_track(&track))
1101 })
1102 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1103 .await
1104 };
1105
1106 let publication = publish_track.await;
1107 this.upgrade(&cx)
1108 .ok_or_else(|| anyhow!("room was dropped"))?
1109 .update(&mut cx, |this, cx| {
1110 let live_kit = this
1111 .live_kit
1112 .as_mut()
1113 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1114
1115 let (canceled, muted) = if let LocalTrack::Pending {
1116 publish_id: cur_publish_id,
1117 muted,
1118 } = &live_kit.microphone_track
1119 {
1120 (*cur_publish_id != publish_id, *muted)
1121 } else {
1122 (true, false)
1123 };
1124
1125 match publication {
1126 Ok(publication) => {
1127 if canceled {
1128 live_kit.room.unpublish_track(publication);
1129 } else {
1130 if muted {
1131 cx.background().spawn(publication.set_mute(muted)).detach();
1132 }
1133 live_kit.microphone_track = LocalTrack::Published {
1134 track_publication: publication,
1135 muted,
1136 };
1137 cx.notify();
1138 }
1139 Ok(())
1140 }
1141 Err(error) => {
1142 if canceled {
1143 Ok(())
1144 } else {
1145 live_kit.microphone_track = LocalTrack::None;
1146 cx.notify();
1147 Err(error)
1148 }
1149 }
1150 }
1151 })
1152 })
1153 }
1154
1155 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1156 if self.status.is_offline() {
1157 return Task::ready(Err(anyhow!("room is offline")));
1158 } else if self.is_screen_sharing() {
1159 return Task::ready(Err(anyhow!("screen was already shared")));
1160 }
1161
1162 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1163 let publish_id = post_inc(&mut live_kit.next_publish_id);
1164 live_kit.screen_track = LocalTrack::Pending {
1165 publish_id,
1166 muted: false,
1167 };
1168 cx.notify();
1169 (live_kit.room.display_sources(), publish_id)
1170 } else {
1171 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1172 };
1173
1174 cx.spawn_weak(|this, mut cx| async move {
1175 let publish_track = async {
1176 let displays = displays.await?;
1177 let display = displays
1178 .first()
1179 .ok_or_else(|| anyhow!("no display found"))?;
1180 let track = LocalVideoTrack::screen_share_for_display(&display);
1181 this.upgrade(&cx)
1182 .ok_or_else(|| anyhow!("room was dropped"))?
1183 .read_with(&cx, |this, _| {
1184 this.live_kit
1185 .as_ref()
1186 .map(|live_kit| live_kit.room.publish_video_track(&track))
1187 })
1188 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1189 .await
1190 };
1191
1192 let publication = publish_track.await;
1193 this.upgrade(&cx)
1194 .ok_or_else(|| anyhow!("room was dropped"))?
1195 .update(&mut cx, |this, cx| {
1196 let live_kit = this
1197 .live_kit
1198 .as_mut()
1199 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1200
1201 let (canceled, muted) = if let LocalTrack::Pending {
1202 publish_id: cur_publish_id,
1203 muted,
1204 } = &live_kit.screen_track
1205 {
1206 (*cur_publish_id != publish_id, *muted)
1207 } else {
1208 (true, false)
1209 };
1210
1211 match publication {
1212 Ok(publication) => {
1213 if canceled {
1214 live_kit.room.unpublish_track(publication);
1215 } else {
1216 if muted {
1217 cx.background().spawn(publication.set_mute(muted)).detach();
1218 }
1219 live_kit.screen_track = LocalTrack::Published {
1220 track_publication: publication,
1221 muted,
1222 };
1223 cx.notify();
1224 }
1225
1226 Audio::play_sound(Sound::StartScreenshare, cx);
1227
1228 Ok(())
1229 }
1230 Err(error) => {
1231 if canceled {
1232 Ok(())
1233 } else {
1234 live_kit.screen_track = LocalTrack::None;
1235 cx.notify();
1236 Err(error)
1237 }
1238 }
1239 }
1240 })
1241 })
1242 }
1243
1244 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1245 let should_mute = !self.is_muted();
1246 if let Some(live_kit) = self.live_kit.as_mut() {
1247 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1248 live_kit.muted_by_user = should_mute;
1249
1250 if old_muted == true && live_kit.deafened == true {
1251 if let Some(task) = self.toggle_deafen(cx).ok() {
1252 task.detach();
1253 }
1254 }
1255
1256 Ok(ret_task)
1257 } else {
1258 Err(anyhow!("LiveKit not started"))
1259 }
1260 }
1261
1262 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1263 if let Some(live_kit) = self.live_kit.as_mut() {
1264 (*live_kit).deafened = !live_kit.deafened;
1265
1266 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1267 // Context notification is sent within set_mute itself.
1268 let mut mute_task = None;
1269 // When deafening, mute user's mic as well.
1270 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1271 if live_kit.deafened || !live_kit.muted_by_user {
1272 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1273 };
1274 for participant in self.remote_participants.values() {
1275 for track in live_kit
1276 .room
1277 .remote_audio_track_publications(&participant.user.id.to_string())
1278 {
1279 tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1280 }
1281 }
1282
1283 Ok(cx.foreground().spawn(async move {
1284 if let Some(mute_task) = mute_task {
1285 mute_task.await?;
1286 }
1287 for task in tasks {
1288 task.await?;
1289 }
1290 Ok(())
1291 }))
1292 } else {
1293 Err(anyhow!("LiveKit not started"))
1294 }
1295 }
1296
1297 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1298 if self.status.is_offline() {
1299 return Err(anyhow!("room is offline"));
1300 }
1301
1302 let live_kit = self
1303 .live_kit
1304 .as_mut()
1305 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1306 match mem::take(&mut live_kit.screen_track) {
1307 LocalTrack::None => Err(anyhow!("screen was not shared")),
1308 LocalTrack::Pending { .. } => {
1309 cx.notify();
1310 Ok(())
1311 }
1312 LocalTrack::Published {
1313 track_publication, ..
1314 } => {
1315 live_kit.room.unpublish_track(track_publication);
1316 cx.notify();
1317
1318 Audio::play_sound(Sound::StopScreenshare, cx);
1319 Ok(())
1320 }
1321 }
1322 }
1323
1324 #[cfg(any(test, feature = "test-support"))]
1325 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1326 self.live_kit
1327 .as_ref()
1328 .unwrap()
1329 .room
1330 .set_display_sources(sources);
1331 }
1332}
1333
1334struct LiveKitRoom {
1335 room: Arc<live_kit_client::Room>,
1336 screen_track: LocalTrack,
1337 microphone_track: LocalTrack,
1338 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1339 muted_by_user: bool,
1340 deafened: bool,
1341 speaking: bool,
1342 next_publish_id: usize,
1343 _maintain_room: Task<()>,
1344 _maintain_tracks: [Task<()>; 2],
1345}
1346
1347impl LiveKitRoom {
1348 fn set_mute(
1349 self: &mut LiveKitRoom,
1350 should_mute: bool,
1351 cx: &mut ModelContext<Room>,
1352 ) -> Result<(Task<Result<()>>, bool)> {
1353 if !should_mute {
1354 // clear user muting state.
1355 self.muted_by_user = false;
1356 }
1357
1358 let (result, old_muted) = match &mut self.microphone_track {
1359 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1360 LocalTrack::Pending { muted, .. } => {
1361 let old_muted = *muted;
1362 *muted = should_mute;
1363 cx.notify();
1364 Ok((Task::Ready(Some(Ok(()))), old_muted))
1365 }
1366 LocalTrack::Published {
1367 track_publication,
1368 muted,
1369 } => {
1370 let old_muted = *muted;
1371 *muted = should_mute;
1372 cx.notify();
1373 Ok((
1374 cx.background().spawn(track_publication.set_mute(*muted)),
1375 old_muted,
1376 ))
1377 }
1378 }?;
1379
1380 if old_muted != should_mute {
1381 if should_mute {
1382 Audio::play_sound(Sound::Mute, cx);
1383 } else {
1384 Audio::play_sound(Sound::Unmute, cx);
1385 }
1386 }
1387
1388 Ok((result, old_muted))
1389 }
1390}
1391
1392enum LocalTrack {
1393 None,
1394 Pending {
1395 publish_id: usize,
1396 muted: bool,
1397 },
1398 Published {
1399 track_publication: LocalTrackPublication,
1400 muted: bool,
1401 },
1402}
1403
1404impl Default for LocalTrack {
1405 fn default() -> Self {
1406 Self::None
1407 }
1408}
1409
1410#[derive(Copy, Clone, PartialEq, Eq)]
1411pub enum RoomStatus {
1412 Online,
1413 Rejoining,
1414 Offline,
1415}
1416
1417impl RoomStatus {
1418 pub fn is_offline(&self) -> bool {
1419 matches!(self, RoomStatus::Offline)
1420 }
1421
1422 pub fn is_online(&self) -> bool {
1423 matches!(self, RoomStatus::Online)
1424 }
1425}