1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
4 IncomingCall,
5};
6use anyhow::{anyhow, Result};
7use audio::{Audio, Sound};
8use client::{
9 proto::{self, PeerId},
10 Client, TypedEnvelope, User, UserStore,
11};
12use collections::{BTreeMap, HashMap, HashSet};
13use fs::Fs;
14use futures::{FutureExt, StreamExt};
15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
16use language::LanguageRegistry;
17use live_kit_client::{
18 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
19 RemoteVideoTrackUpdate,
20};
21use postage::stream::Stream;
22use project::Project;
23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
24use util::{post_inc, ResultExt, TryFutureExt};
25
26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
27
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum Event {
30 ParticipantLocationChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteVideoTracksChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteAudioTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteProjectShared {
40 owner: Arc<User>,
41 project_id: u64,
42 worktree_root_names: Vec<String>,
43 },
44 RemoteProjectUnshared {
45 project_id: u64,
46 },
47 Left,
48}
49
50pub struct Room {
51 id: u64,
52 channel_id: Option<u64>,
53 live_kit: Option<LiveKitRoom>,
54 status: RoomStatus,
55 shared_projects: HashSet<WeakModelHandle<Project>>,
56 joined_projects: HashSet<WeakModelHandle<Project>>,
57 local_participant: LocalParticipant,
58 remote_participants: BTreeMap<u64, RemoteParticipant>,
59 pending_participants: Vec<Arc<User>>,
60 participant_user_ids: HashSet<u64>,
61 pending_call_count: usize,
62 leave_when_empty: bool,
63 client: Arc<Client>,
64 user_store: ModelHandle<UserStore>,
65 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
66 subscriptions: Vec<client::Subscription>,
67 pending_room_update: Option<Task<()>>,
68 maintain_connection: Option<Task<Option<()>>>,
69}
70
71impl Entity for Room {
72 type Event = Event;
73
74 fn release(&mut self, cx: &mut AppContext) {
75 if self.status.is_online() {
76 self.leave_internal(cx).detach_and_log_err(cx);
77 }
78 }
79
80 fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
81 if self.status.is_online() {
82 let leave = self.leave_internal(cx);
83 Some(
84 cx.background()
85 .spawn(async move {
86 leave.await.log_err();
87 })
88 .boxed(),
89 )
90 } else {
91 None
92 }
93 }
94}
95
96impl Room {
97 pub fn channel_id(&self) -> Option<u64> {
98 self.channel_id
99 }
100
101 #[cfg(any(test, feature = "test-support"))]
102 pub fn is_connected(&self) -> bool {
103 if let Some(live_kit) = self.live_kit.as_ref() {
104 matches!(
105 *live_kit.room.status().borrow(),
106 live_kit_client::ConnectionState::Connected { .. }
107 )
108 } else {
109 false
110 }
111 }
112
113 fn new(
114 id: u64,
115 channel_id: Option<u64>,
116 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
117 client: Arc<Client>,
118 user_store: ModelHandle<UserStore>,
119 cx: &mut ModelContext<Self>,
120 ) -> Self {
121 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
122 let room = live_kit_client::Room::new();
123 let mut status = room.status();
124 // Consume the initial status of the room.
125 let _ = status.try_recv();
126 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
127 while let Some(status) = status.next().await {
128 let this = if let Some(this) = this.upgrade(&cx) {
129 this
130 } else {
131 break;
132 };
133
134 if status == live_kit_client::ConnectionState::Disconnected {
135 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
136 break;
137 }
138 }
139 });
140
141 let mut track_video_changes = room.remote_video_track_updates();
142 let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
143 while let Some(track_change) = track_video_changes.next().await {
144 let this = if let Some(this) = this.upgrade(&cx) {
145 this
146 } else {
147 break;
148 };
149
150 this.update(&mut cx, |this, cx| {
151 this.remote_video_track_updated(track_change, cx).log_err()
152 });
153 }
154 });
155
156 let mut track_audio_changes = room.remote_audio_track_updates();
157 let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
158 while let Some(track_change) = track_audio_changes.next().await {
159 let this = if let Some(this) = this.upgrade(&cx) {
160 this
161 } else {
162 break;
163 };
164
165 this.update(&mut cx, |this, cx| {
166 this.remote_audio_track_updated(track_change, cx).log_err()
167 });
168 }
169 });
170
171 let connect = room.connect(&connection_info.server_url, &connection_info.token);
172 cx.spawn(|this, mut cx| async move {
173 connect.await?;
174
175 if !cx.read(|cx| settings::get::<CallSettings>(cx).mute_on_join) {
176 this.update(&mut cx, |this, cx| this.share_microphone(cx))
177 .await?;
178 }
179
180 anyhow::Ok(())
181 })
182 .detach_and_log_err(cx);
183
184 Some(LiveKitRoom {
185 room,
186 screen_track: LocalTrack::None,
187 microphone_track: LocalTrack::None,
188 next_publish_id: 0,
189 muted_by_user: false,
190 deafened: false,
191 speaking: false,
192 _maintain_room,
193 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
194 })
195 } else {
196 None
197 };
198
199 let maintain_connection =
200 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
201
202 Audio::play_sound(Sound::Joined, cx);
203
204 Self {
205 id,
206 channel_id,
207 live_kit: live_kit_room,
208 status: RoomStatus::Online,
209 shared_projects: Default::default(),
210 joined_projects: Default::default(),
211 participant_user_ids: Default::default(),
212 local_participant: Default::default(),
213 remote_participants: Default::default(),
214 pending_participants: Default::default(),
215 pending_call_count: 0,
216 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
217 leave_when_empty: false,
218 pending_room_update: None,
219 client,
220 user_store,
221 follows_by_leader_id_project_id: Default::default(),
222 maintain_connection: Some(maintain_connection),
223 }
224 }
225
226 pub(crate) fn create(
227 called_user_id: u64,
228 initial_project: Option<ModelHandle<Project>>,
229 client: Arc<Client>,
230 user_store: ModelHandle<UserStore>,
231 cx: &mut AppContext,
232 ) -> Task<Result<ModelHandle<Self>>> {
233 cx.spawn(|mut cx| async move {
234 let response = client.request(proto::CreateRoom {}).await?;
235 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
236 let room = cx.add_model(|cx| {
237 Self::new(
238 room_proto.id,
239 None,
240 response.live_kit_connection_info,
241 client,
242 user_store,
243 cx,
244 )
245 });
246
247 let initial_project_id = if let Some(initial_project) = initial_project {
248 let initial_project_id = room
249 .update(&mut cx, |room, cx| {
250 room.share_project(initial_project.clone(), cx)
251 })
252 .await?;
253 Some(initial_project_id)
254 } else {
255 None
256 };
257
258 match room
259 .update(&mut cx, |room, cx| {
260 room.leave_when_empty = true;
261 room.call(called_user_id, initial_project_id, cx)
262 })
263 .await
264 {
265 Ok(()) => Ok(room),
266 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
267 }
268 })
269 }
270
271 pub(crate) fn join_channel(
272 channel_id: u64,
273 client: Arc<Client>,
274 user_store: ModelHandle<UserStore>,
275 cx: &mut AppContext,
276 ) -> Task<Result<ModelHandle<Self>>> {
277 cx.spawn(|mut cx| async move {
278 let response = client.request(proto::JoinChannel { channel_id }).await?;
279 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
280 let room = cx.add_model(|cx| {
281 Self::new(
282 room_proto.id,
283 Some(channel_id),
284 response.live_kit_connection_info,
285 client,
286 user_store,
287 cx,
288 )
289 });
290
291 room.update(&mut cx, |room, cx| {
292 room.apply_room_update(room_proto, cx)?;
293 anyhow::Ok(())
294 })?;
295
296 Ok(room)
297 })
298 }
299
300 pub(crate) fn join(
301 call: &IncomingCall,
302 client: Arc<Client>,
303 user_store: ModelHandle<UserStore>,
304 cx: &mut AppContext,
305 ) -> Task<Result<ModelHandle<Self>>> {
306 let room_id = call.room_id;
307 cx.spawn(|mut cx| async move {
308 let response = client.request(proto::JoinRoom { id: room_id }).await?;
309 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
310 let room = cx.add_model(|cx| {
311 Self::new(
312 room_id,
313 None,
314 response.live_kit_connection_info,
315 client,
316 user_store,
317 cx,
318 )
319 });
320 room.update(&mut cx, |room, cx| {
321 room.leave_when_empty = true;
322 room.apply_room_update(room_proto, cx)?;
323 anyhow::Ok(())
324 })?;
325
326 Ok(room)
327 })
328 }
329
330 fn should_leave(&self) -> bool {
331 self.leave_when_empty
332 && self.pending_room_update.is_none()
333 && self.pending_participants.is_empty()
334 && self.remote_participants.is_empty()
335 && self.pending_call_count == 0
336 }
337
338 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
339 cx.notify();
340 cx.emit(Event::Left);
341 self.leave_internal(cx)
342 }
343
344 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
345 if self.status.is_offline() {
346 return Task::ready(Err(anyhow!("room is offline")));
347 }
348
349 log::info!("leaving room");
350
351 for project in self.shared_projects.drain() {
352 if let Some(project) = project.upgrade(cx) {
353 project.update(cx, |project, cx| {
354 project.unshare(cx).log_err();
355 });
356 }
357 }
358 for project in self.joined_projects.drain() {
359 if let Some(project) = project.upgrade(cx) {
360 project.update(cx, |project, cx| {
361 project.disconnected_from_host(cx);
362 project.close(cx);
363 });
364 }
365 }
366
367 Audio::play_sound(Sound::Leave, cx);
368
369 self.status = RoomStatus::Offline;
370 self.remote_participants.clear();
371 self.pending_participants.clear();
372 self.participant_user_ids.clear();
373 self.subscriptions.clear();
374 self.live_kit.take();
375 self.pending_room_update.take();
376 self.maintain_connection.take();
377
378 let leave_room = self.client.request(proto::LeaveRoom {});
379 cx.background().spawn(async move {
380 leave_room.await?;
381 anyhow::Ok(())
382 })
383 }
384
385 async fn maintain_connection(
386 this: WeakModelHandle<Self>,
387 client: Arc<Client>,
388 mut cx: AsyncAppContext,
389 ) -> Result<()> {
390 let mut client_status = client.status();
391 loop {
392 let _ = client_status.try_recv();
393 let is_connected = client_status.borrow().is_connected();
394 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
395 if !is_connected || client_status.next().await.is_some() {
396 log::info!("detected client disconnection");
397
398 this.upgrade(&cx)
399 .ok_or_else(|| anyhow!("room was dropped"))?
400 .update(&mut cx, |this, cx| {
401 this.status = RoomStatus::Rejoining;
402 cx.notify();
403 });
404
405 // Wait for client to re-establish a connection to the server.
406 {
407 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
408 let client_reconnection = async {
409 let mut remaining_attempts = 3;
410 while remaining_attempts > 0 {
411 if client_status.borrow().is_connected() {
412 log::info!("client reconnected, attempting to rejoin room");
413
414 let Some(this) = this.upgrade(&cx) else { break };
415 if this
416 .update(&mut cx, |this, cx| this.rejoin(cx))
417 .await
418 .log_err()
419 .is_some()
420 {
421 return true;
422 } else {
423 remaining_attempts -= 1;
424 }
425 } else if client_status.borrow().is_signed_out() {
426 return false;
427 }
428
429 log::info!(
430 "waiting for client status change, remaining attempts {}",
431 remaining_attempts
432 );
433 client_status.next().await;
434 }
435 false
436 }
437 .fuse();
438 futures::pin_mut!(client_reconnection);
439
440 futures::select_biased! {
441 reconnected = client_reconnection => {
442 if reconnected {
443 log::info!("successfully reconnected to room");
444 // If we successfully joined the room, go back around the loop
445 // waiting for future connection status changes.
446 continue;
447 }
448 }
449 _ = reconnection_timeout => {
450 log::info!("room reconnection timeout expired");
451 }
452 }
453 }
454
455 break;
456 }
457 }
458
459 // The client failed to re-establish a connection to the server
460 // or an error occurred while trying to re-join the room. Either way
461 // we leave the room and return an error.
462 if let Some(this) = this.upgrade(&cx) {
463 log::info!("reconnection failed, leaving room");
464 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
465 }
466 Err(anyhow!(
467 "can't reconnect to room: client failed to re-establish connection"
468 ))
469 }
470
471 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
472 let mut projects = HashMap::default();
473 let mut reshared_projects = Vec::new();
474 let mut rejoined_projects = Vec::new();
475 self.shared_projects.retain(|project| {
476 if let Some(handle) = project.upgrade(cx) {
477 let project = handle.read(cx);
478 if let Some(project_id) = project.remote_id() {
479 projects.insert(project_id, handle.clone());
480 reshared_projects.push(proto::UpdateProject {
481 project_id,
482 worktrees: project.worktree_metadata_protos(cx),
483 });
484 return true;
485 }
486 }
487 false
488 });
489 self.joined_projects.retain(|project| {
490 if let Some(handle) = project.upgrade(cx) {
491 let project = handle.read(cx);
492 if let Some(project_id) = project.remote_id() {
493 projects.insert(project_id, handle.clone());
494 rejoined_projects.push(proto::RejoinProject {
495 id: project_id,
496 worktrees: project
497 .worktrees(cx)
498 .map(|worktree| {
499 let worktree = worktree.read(cx);
500 proto::RejoinWorktree {
501 id: worktree.id().to_proto(),
502 scan_id: worktree.completed_scan_id() as u64,
503 }
504 })
505 .collect(),
506 });
507 }
508 return true;
509 }
510 false
511 });
512
513 let response = self.client.request_envelope(proto::RejoinRoom {
514 id: self.id,
515 reshared_projects,
516 rejoined_projects,
517 });
518
519 cx.spawn(|this, mut cx| async move {
520 let response = response.await?;
521 let message_id = response.message_id;
522 let response = response.payload;
523 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
524 this.update(&mut cx, |this, cx| {
525 this.status = RoomStatus::Online;
526 this.apply_room_update(room_proto, cx)?;
527
528 for reshared_project in response.reshared_projects {
529 if let Some(project) = projects.get(&reshared_project.id) {
530 project.update(cx, |project, cx| {
531 project.reshared(reshared_project, cx).log_err();
532 });
533 }
534 }
535
536 for rejoined_project in response.rejoined_projects {
537 if let Some(project) = projects.get(&rejoined_project.id) {
538 project.update(cx, |project, cx| {
539 project.rejoined(rejoined_project, message_id, cx).log_err();
540 });
541 }
542 }
543
544 anyhow::Ok(())
545 })
546 })
547 }
548
549 pub fn id(&self) -> u64 {
550 self.id
551 }
552
553 pub fn status(&self) -> RoomStatus {
554 self.status
555 }
556
557 pub fn local_participant(&self) -> &LocalParticipant {
558 &self.local_participant
559 }
560
561 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
562 &self.remote_participants
563 }
564
565 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
566 self.remote_participants
567 .values()
568 .find(|p| p.peer_id == peer_id)
569 }
570
571 pub fn pending_participants(&self) -> &[Arc<User>] {
572 &self.pending_participants
573 }
574
575 pub fn contains_participant(&self, user_id: u64) -> bool {
576 self.participant_user_ids.contains(&user_id)
577 }
578
579 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
580 self.follows_by_leader_id_project_id
581 .get(&(leader_id, project_id))
582 .map_or(&[], |v| v.as_slice())
583 }
584
585 async fn handle_room_updated(
586 this: ModelHandle<Self>,
587 envelope: TypedEnvelope<proto::RoomUpdated>,
588 _: Arc<Client>,
589 mut cx: AsyncAppContext,
590 ) -> Result<()> {
591 let room = envelope
592 .payload
593 .room
594 .ok_or_else(|| anyhow!("invalid room"))?;
595 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
596 }
597
598 fn apply_room_update(
599 &mut self,
600 mut room: proto::Room,
601 cx: &mut ModelContext<Self>,
602 ) -> Result<()> {
603 // Filter ourselves out from the room's participants.
604 let local_participant_ix = room
605 .participants
606 .iter()
607 .position(|participant| Some(participant.user_id) == self.client.user_id());
608 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
609
610 let pending_participant_user_ids = room
611 .pending_participants
612 .iter()
613 .map(|p| p.user_id)
614 .collect::<Vec<_>>();
615
616 let remote_participant_user_ids = room
617 .participants
618 .iter()
619 .map(|p| p.user_id)
620 .collect::<Vec<_>>();
621
622 let (remote_participants, pending_participants) =
623 self.user_store.update(cx, move |user_store, cx| {
624 (
625 user_store.get_users(remote_participant_user_ids, cx),
626 user_store.get_users(pending_participant_user_ids, cx),
627 )
628 });
629
630 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
631 let (remote_participants, pending_participants) =
632 futures::join!(remote_participants, pending_participants);
633
634 this.update(&mut cx, |this, cx| {
635 this.participant_user_ids.clear();
636
637 if let Some(participant) = local_participant {
638 this.local_participant.projects = participant.projects;
639 } else {
640 this.local_participant.projects.clear();
641 }
642
643 if let Some(participants) = remote_participants.log_err() {
644 for (participant, user) in room.participants.into_iter().zip(participants) {
645 let Some(peer_id) = participant.peer_id else { continue };
646 this.participant_user_ids.insert(participant.user_id);
647
648 let old_projects = this
649 .remote_participants
650 .get(&participant.user_id)
651 .into_iter()
652 .flat_map(|existing| &existing.projects)
653 .map(|project| project.id)
654 .collect::<HashSet<_>>();
655 let new_projects = participant
656 .projects
657 .iter()
658 .map(|project| project.id)
659 .collect::<HashSet<_>>();
660
661 for project in &participant.projects {
662 if !old_projects.contains(&project.id) {
663 cx.emit(Event::RemoteProjectShared {
664 owner: user.clone(),
665 project_id: project.id,
666 worktree_root_names: project.worktree_root_names.clone(),
667 });
668 }
669 }
670
671 for unshared_project_id in old_projects.difference(&new_projects) {
672 this.joined_projects.retain(|project| {
673 if let Some(project) = project.upgrade(cx) {
674 project.update(cx, |project, cx| {
675 if project.remote_id() == Some(*unshared_project_id) {
676 project.disconnected_from_host(cx);
677 false
678 } else {
679 true
680 }
681 })
682 } else {
683 false
684 }
685 });
686 cx.emit(Event::RemoteProjectUnshared {
687 project_id: *unshared_project_id,
688 });
689 }
690
691 let location = ParticipantLocation::from_proto(participant.location)
692 .unwrap_or(ParticipantLocation::External);
693 if let Some(remote_participant) =
694 this.remote_participants.get_mut(&participant.user_id)
695 {
696 remote_participant.projects = participant.projects;
697 remote_participant.peer_id = peer_id;
698 if location != remote_participant.location {
699 remote_participant.location = location;
700 cx.emit(Event::ParticipantLocationChanged {
701 participant_id: peer_id,
702 });
703 }
704 } else {
705 this.remote_participants.insert(
706 participant.user_id,
707 RemoteParticipant {
708 user: user.clone(),
709 peer_id,
710 projects: participant.projects,
711 location,
712 muted: true,
713 speaking: false,
714 video_tracks: Default::default(),
715 audio_tracks: Default::default(),
716 },
717 );
718
719 Audio::play_sound(Sound::Joined, cx);
720
721 if let Some(live_kit) = this.live_kit.as_ref() {
722 let video_tracks =
723 live_kit.room.remote_video_tracks(&user.id.to_string());
724 let audio_tracks =
725 live_kit.room.remote_audio_tracks(&user.id.to_string());
726 let publications = live_kit
727 .room
728 .remote_audio_track_publications(&user.id.to_string());
729
730 for track in video_tracks {
731 this.remote_video_track_updated(
732 RemoteVideoTrackUpdate::Subscribed(track),
733 cx,
734 )
735 .log_err();
736 }
737
738 for (track, publication) in
739 audio_tracks.iter().zip(publications.iter())
740 {
741 this.remote_audio_track_updated(
742 RemoteAudioTrackUpdate::Subscribed(
743 track.clone(),
744 publication.clone(),
745 ),
746 cx,
747 )
748 .log_err();
749 }
750 }
751 }
752 }
753
754 this.remote_participants.retain(|user_id, participant| {
755 if this.participant_user_ids.contains(user_id) {
756 true
757 } else {
758 for project in &participant.projects {
759 cx.emit(Event::RemoteProjectUnshared {
760 project_id: project.id,
761 });
762 }
763 false
764 }
765 });
766 }
767
768 if let Some(pending_participants) = pending_participants.log_err() {
769 this.pending_participants = pending_participants;
770 for participant in &this.pending_participants {
771 this.participant_user_ids.insert(participant.id);
772 }
773 }
774
775 this.follows_by_leader_id_project_id.clear();
776 for follower in room.followers {
777 let project_id = follower.project_id;
778 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
779 (Some(leader), Some(follower)) => (leader, follower),
780
781 _ => {
782 log::error!("Follower message {follower:?} missing some state");
783 continue;
784 }
785 };
786
787 let list = this
788 .follows_by_leader_id_project_id
789 .entry((leader, project_id))
790 .or_insert(Vec::new());
791 if !list.contains(&follower) {
792 list.push(follower);
793 }
794 }
795
796 this.pending_room_update.take();
797 if this.should_leave() {
798 log::info!("room is empty, leaving");
799 let _ = this.leave(cx);
800 }
801
802 this.check_invariants();
803 cx.notify();
804 });
805 }));
806
807 cx.notify();
808 Ok(())
809 }
810
811 fn remote_video_track_updated(
812 &mut self,
813 change: RemoteVideoTrackUpdate,
814 cx: &mut ModelContext<Self>,
815 ) -> Result<()> {
816 match change {
817 RemoteVideoTrackUpdate::Subscribed(track) => {
818 let user_id = track.publisher_id().parse()?;
819 let track_id = track.sid().to_string();
820 let participant = self
821 .remote_participants
822 .get_mut(&user_id)
823 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
824 participant.video_tracks.insert(
825 track_id.clone(),
826 Arc::new(RemoteVideoTrack {
827 live_kit_track: track,
828 }),
829 );
830 cx.emit(Event::RemoteVideoTracksChanged {
831 participant_id: participant.peer_id,
832 });
833 }
834 RemoteVideoTrackUpdate::Unsubscribed {
835 publisher_id,
836 track_id,
837 } => {
838 let user_id = publisher_id.parse()?;
839 let participant = self
840 .remote_participants
841 .get_mut(&user_id)
842 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
843 participant.video_tracks.remove(&track_id);
844 cx.emit(Event::RemoteVideoTracksChanged {
845 participant_id: participant.peer_id,
846 });
847 }
848 }
849
850 cx.notify();
851 Ok(())
852 }
853
854 fn remote_audio_track_updated(
855 &mut self,
856 change: RemoteAudioTrackUpdate,
857 cx: &mut ModelContext<Self>,
858 ) -> Result<()> {
859 match change {
860 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
861 let mut speaker_ids = speakers
862 .into_iter()
863 .filter_map(|speaker_sid| speaker_sid.parse().ok())
864 .collect::<Vec<u64>>();
865 speaker_ids.sort_unstable();
866 for (sid, participant) in &mut self.remote_participants {
867 if let Ok(_) = speaker_ids.binary_search(sid) {
868 participant.speaking = true;
869 } else {
870 participant.speaking = false;
871 }
872 }
873 if let Some(id) = self.client.user_id() {
874 if let Some(room) = &mut self.live_kit {
875 if let Ok(_) = speaker_ids.binary_search(&id) {
876 room.speaking = true;
877 } else {
878 room.speaking = false;
879 }
880 }
881 }
882 cx.notify();
883 }
884 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
885 let mut found = false;
886 for participant in &mut self.remote_participants.values_mut() {
887 for track in participant.audio_tracks.values() {
888 if track.sid() == track_id {
889 found = true;
890 break;
891 }
892 }
893 if found {
894 participant.muted = muted;
895 break;
896 }
897 }
898
899 cx.notify();
900 }
901 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
902 let user_id = track.publisher_id().parse()?;
903 let track_id = track.sid().to_string();
904 let participant = self
905 .remote_participants
906 .get_mut(&user_id)
907 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
908
909 participant.audio_tracks.insert(track_id.clone(), track);
910 participant.muted = publication.is_muted();
911
912 cx.emit(Event::RemoteAudioTracksChanged {
913 participant_id: participant.peer_id,
914 });
915 }
916 RemoteAudioTrackUpdate::Unsubscribed {
917 publisher_id,
918 track_id,
919 } => {
920 let user_id = publisher_id.parse()?;
921 let participant = self
922 .remote_participants
923 .get_mut(&user_id)
924 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
925 participant.audio_tracks.remove(&track_id);
926 cx.emit(Event::RemoteAudioTracksChanged {
927 participant_id: participant.peer_id,
928 });
929 }
930 }
931
932 cx.notify();
933 Ok(())
934 }
935
936 fn check_invariants(&self) {
937 #[cfg(any(test, feature = "test-support"))]
938 {
939 for participant in self.remote_participants.values() {
940 assert!(self.participant_user_ids.contains(&participant.user.id));
941 assert_ne!(participant.user.id, self.client.user_id().unwrap());
942 }
943
944 for participant in &self.pending_participants {
945 assert!(self.participant_user_ids.contains(&participant.id));
946 assert_ne!(participant.id, self.client.user_id().unwrap());
947 }
948
949 assert_eq!(
950 self.participant_user_ids.len(),
951 self.remote_participants.len() + self.pending_participants.len()
952 );
953 }
954 }
955
956 pub(crate) fn call(
957 &mut self,
958 called_user_id: u64,
959 initial_project_id: Option<u64>,
960 cx: &mut ModelContext<Self>,
961 ) -> Task<Result<()>> {
962 if self.status.is_offline() {
963 return Task::ready(Err(anyhow!("room is offline")));
964 }
965
966 cx.notify();
967 let client = self.client.clone();
968 let room_id = self.id;
969 self.pending_call_count += 1;
970 cx.spawn(|this, mut cx| async move {
971 let result = client
972 .request(proto::Call {
973 room_id,
974 called_user_id,
975 initial_project_id,
976 })
977 .await;
978 this.update(&mut cx, |this, cx| {
979 this.pending_call_count -= 1;
980 if this.should_leave() {
981 this.leave(cx).detach_and_log_err(cx);
982 }
983 });
984 result?;
985 Ok(())
986 })
987 }
988
989 pub fn join_project(
990 &mut self,
991 id: u64,
992 language_registry: Arc<LanguageRegistry>,
993 fs: Arc<dyn Fs>,
994 cx: &mut ModelContext<Self>,
995 ) -> Task<Result<ModelHandle<Project>>> {
996 let client = self.client.clone();
997 let user_store = self.user_store.clone();
998 cx.spawn(|this, mut cx| async move {
999 let project =
1000 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1001
1002 this.update(&mut cx, |this, cx| {
1003 this.joined_projects.retain(|project| {
1004 if let Some(project) = project.upgrade(cx) {
1005 !project.read(cx).is_read_only()
1006 } else {
1007 false
1008 }
1009 });
1010 this.joined_projects.insert(project.downgrade());
1011 });
1012 Ok(project)
1013 })
1014 }
1015
1016 pub(crate) fn share_project(
1017 &mut self,
1018 project: ModelHandle<Project>,
1019 cx: &mut ModelContext<Self>,
1020 ) -> Task<Result<u64>> {
1021 if let Some(project_id) = project.read(cx).remote_id() {
1022 return Task::ready(Ok(project_id));
1023 }
1024
1025 let request = self.client.request(proto::ShareProject {
1026 room_id: self.id(),
1027 worktrees: project.read(cx).worktree_metadata_protos(cx),
1028 });
1029 cx.spawn(|this, mut cx| async move {
1030 let response = request.await?;
1031
1032 project.update(&mut cx, |project, cx| {
1033 project.shared(response.project_id, cx)
1034 })?;
1035
1036 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1037 this.update(&mut cx, |this, cx| {
1038 this.shared_projects.insert(project.downgrade());
1039 let active_project = this.local_participant.active_project.as_ref();
1040 if active_project.map_or(false, |location| *location == project) {
1041 this.set_location(Some(&project), cx)
1042 } else {
1043 Task::ready(Ok(()))
1044 }
1045 })
1046 .await?;
1047
1048 Ok(response.project_id)
1049 })
1050 }
1051
1052 pub(crate) fn unshare_project(
1053 &mut self,
1054 project: ModelHandle<Project>,
1055 cx: &mut ModelContext<Self>,
1056 ) -> Result<()> {
1057 let project_id = match project.read(cx).remote_id() {
1058 Some(project_id) => project_id,
1059 None => return Ok(()),
1060 };
1061
1062 self.client.send(proto::UnshareProject { project_id })?;
1063 project.update(cx, |this, cx| this.unshare(cx))
1064 }
1065
1066 pub(crate) fn set_location(
1067 &mut self,
1068 project: Option<&ModelHandle<Project>>,
1069 cx: &mut ModelContext<Self>,
1070 ) -> Task<Result<()>> {
1071 if self.status.is_offline() {
1072 return Task::ready(Err(anyhow!("room is offline")));
1073 }
1074
1075 let client = self.client.clone();
1076 let room_id = self.id;
1077 let location = if let Some(project) = project {
1078 self.local_participant.active_project = Some(project.downgrade());
1079 if let Some(project_id) = project.read(cx).remote_id() {
1080 proto::participant_location::Variant::SharedProject(
1081 proto::participant_location::SharedProject { id: project_id },
1082 )
1083 } else {
1084 proto::participant_location::Variant::UnsharedProject(
1085 proto::participant_location::UnsharedProject {},
1086 )
1087 }
1088 } else {
1089 self.local_participant.active_project = None;
1090 proto::participant_location::Variant::External(proto::participant_location::External {})
1091 };
1092
1093 cx.notify();
1094 cx.foreground().spawn(async move {
1095 client
1096 .request(proto::UpdateParticipantLocation {
1097 room_id,
1098 location: Some(proto::ParticipantLocation {
1099 variant: Some(location),
1100 }),
1101 })
1102 .await?;
1103 Ok(())
1104 })
1105 }
1106
1107 pub fn is_screen_sharing(&self) -> bool {
1108 self.live_kit.as_ref().map_or(false, |live_kit| {
1109 !matches!(live_kit.screen_track, LocalTrack::None)
1110 })
1111 }
1112
1113 pub fn is_sharing_mic(&self) -> bool {
1114 self.live_kit.as_ref().map_or(false, |live_kit| {
1115 !matches!(live_kit.microphone_track, LocalTrack::None)
1116 })
1117 }
1118
1119 pub fn is_muted(&self) -> bool {
1120 self.live_kit
1121 .as_ref()
1122 .and_then(|live_kit| match &live_kit.microphone_track {
1123 LocalTrack::None => Some(true),
1124 LocalTrack::Pending { muted, .. } => Some(*muted),
1125 LocalTrack::Published { muted, .. } => Some(*muted),
1126 })
1127 .unwrap_or(false)
1128 }
1129
1130 pub fn is_speaking(&self) -> bool {
1131 self.live_kit
1132 .as_ref()
1133 .map_or(false, |live_kit| live_kit.speaking)
1134 }
1135
1136 pub fn is_deafened(&self) -> Option<bool> {
1137 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1138 }
1139
1140 #[track_caller]
1141 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1142 if self.status.is_offline() {
1143 return Task::ready(Err(anyhow!("room is offline")));
1144 } else if self.is_sharing_mic() {
1145 return Task::ready(Err(anyhow!("microphone was already shared")));
1146 }
1147
1148 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1149 let publish_id = post_inc(&mut live_kit.next_publish_id);
1150 live_kit.microphone_track = LocalTrack::Pending {
1151 publish_id,
1152 muted: false,
1153 };
1154 cx.notify();
1155 publish_id
1156 } else {
1157 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1158 };
1159
1160 cx.spawn_weak(|this, mut cx| async move {
1161 let publish_track = async {
1162 let track = LocalAudioTrack::create();
1163 this.upgrade(&cx)
1164 .ok_or_else(|| anyhow!("room was dropped"))?
1165 .read_with(&cx, |this, _| {
1166 this.live_kit
1167 .as_ref()
1168 .map(|live_kit| live_kit.room.publish_audio_track(&track))
1169 })
1170 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1171 .await
1172 };
1173
1174 let publication = publish_track.await;
1175 this.upgrade(&cx)
1176 .ok_or_else(|| anyhow!("room was dropped"))?
1177 .update(&mut cx, |this, cx| {
1178 let live_kit = this
1179 .live_kit
1180 .as_mut()
1181 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1182
1183 let (canceled, muted) = if let LocalTrack::Pending {
1184 publish_id: cur_publish_id,
1185 muted,
1186 } = &live_kit.microphone_track
1187 {
1188 (*cur_publish_id != publish_id, *muted)
1189 } else {
1190 (true, false)
1191 };
1192
1193 match publication {
1194 Ok(publication) => {
1195 if canceled {
1196 live_kit.room.unpublish_track(publication);
1197 } else {
1198 if muted {
1199 cx.background().spawn(publication.set_mute(muted)).detach();
1200 }
1201 live_kit.microphone_track = LocalTrack::Published {
1202 track_publication: publication,
1203 muted,
1204 };
1205 cx.notify();
1206 }
1207 Ok(())
1208 }
1209 Err(error) => {
1210 if canceled {
1211 Ok(())
1212 } else {
1213 live_kit.microphone_track = LocalTrack::None;
1214 cx.notify();
1215 Err(error)
1216 }
1217 }
1218 }
1219 })
1220 })
1221 }
1222
1223 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1224 if self.status.is_offline() {
1225 return Task::ready(Err(anyhow!("room is offline")));
1226 } else if self.is_screen_sharing() {
1227 return Task::ready(Err(anyhow!("screen was already shared")));
1228 }
1229
1230 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1231 let publish_id = post_inc(&mut live_kit.next_publish_id);
1232 live_kit.screen_track = LocalTrack::Pending {
1233 publish_id,
1234 muted: false,
1235 };
1236 cx.notify();
1237 (live_kit.room.display_sources(), publish_id)
1238 } else {
1239 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1240 };
1241
1242 cx.spawn_weak(|this, mut cx| async move {
1243 let publish_track = async {
1244 let displays = displays.await?;
1245 let display = displays
1246 .first()
1247 .ok_or_else(|| anyhow!("no display found"))?;
1248 let track = LocalVideoTrack::screen_share_for_display(&display);
1249 this.upgrade(&cx)
1250 .ok_or_else(|| anyhow!("room was dropped"))?
1251 .read_with(&cx, |this, _| {
1252 this.live_kit
1253 .as_ref()
1254 .map(|live_kit| live_kit.room.publish_video_track(&track))
1255 })
1256 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1257 .await
1258 };
1259
1260 let publication = publish_track.await;
1261 this.upgrade(&cx)
1262 .ok_or_else(|| anyhow!("room was dropped"))?
1263 .update(&mut cx, |this, cx| {
1264 let live_kit = this
1265 .live_kit
1266 .as_mut()
1267 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1268
1269 let (canceled, muted) = if let LocalTrack::Pending {
1270 publish_id: cur_publish_id,
1271 muted,
1272 } = &live_kit.screen_track
1273 {
1274 (*cur_publish_id != publish_id, *muted)
1275 } else {
1276 (true, false)
1277 };
1278
1279 match publication {
1280 Ok(publication) => {
1281 if canceled {
1282 live_kit.room.unpublish_track(publication);
1283 } else {
1284 if muted {
1285 cx.background().spawn(publication.set_mute(muted)).detach();
1286 }
1287 live_kit.screen_track = LocalTrack::Published {
1288 track_publication: publication,
1289 muted,
1290 };
1291 cx.notify();
1292 }
1293
1294 Audio::play_sound(Sound::StartScreenshare, cx);
1295
1296 Ok(())
1297 }
1298 Err(error) => {
1299 if canceled {
1300 Ok(())
1301 } else {
1302 live_kit.screen_track = LocalTrack::None;
1303 cx.notify();
1304 Err(error)
1305 }
1306 }
1307 }
1308 })
1309 })
1310 }
1311
1312 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1313 let should_mute = !self.is_muted();
1314 if let Some(live_kit) = self.live_kit.as_mut() {
1315 if matches!(live_kit.microphone_track, LocalTrack::None) {
1316 return Ok(self.share_microphone(cx));
1317 }
1318
1319 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1320 live_kit.muted_by_user = should_mute;
1321
1322 if old_muted == true && live_kit.deafened == true {
1323 if let Some(task) = self.toggle_deafen(cx).ok() {
1324 task.detach();
1325 }
1326 }
1327
1328 Ok(ret_task)
1329 } else {
1330 Err(anyhow!("LiveKit not started"))
1331 }
1332 }
1333
1334 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1335 if let Some(live_kit) = self.live_kit.as_mut() {
1336 (*live_kit).deafened = !live_kit.deafened;
1337
1338 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1339 // Context notification is sent within set_mute itself.
1340 let mut mute_task = None;
1341 // When deafening, mute user's mic as well.
1342 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1343 if live_kit.deafened || !live_kit.muted_by_user {
1344 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1345 };
1346 for participant in self.remote_participants.values() {
1347 for track in live_kit
1348 .room
1349 .remote_audio_track_publications(&participant.user.id.to_string())
1350 {
1351 tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1352 }
1353 }
1354
1355 Ok(cx.foreground().spawn(async move {
1356 if let Some(mute_task) = mute_task {
1357 mute_task.await?;
1358 }
1359 for task in tasks {
1360 task.await?;
1361 }
1362 Ok(())
1363 }))
1364 } else {
1365 Err(anyhow!("LiveKit not started"))
1366 }
1367 }
1368
1369 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1370 if self.status.is_offline() {
1371 return Err(anyhow!("room is offline"));
1372 }
1373
1374 let live_kit = self
1375 .live_kit
1376 .as_mut()
1377 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1378 match mem::take(&mut live_kit.screen_track) {
1379 LocalTrack::None => Err(anyhow!("screen was not shared")),
1380 LocalTrack::Pending { .. } => {
1381 cx.notify();
1382 Ok(())
1383 }
1384 LocalTrack::Published {
1385 track_publication, ..
1386 } => {
1387 live_kit.room.unpublish_track(track_publication);
1388 cx.notify();
1389
1390 Audio::play_sound(Sound::StopScreenshare, cx);
1391 Ok(())
1392 }
1393 }
1394 }
1395
1396 #[cfg(any(test, feature = "test-support"))]
1397 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1398 self.live_kit
1399 .as_ref()
1400 .unwrap()
1401 .room
1402 .set_display_sources(sources);
1403 }
1404}
1405
1406struct LiveKitRoom {
1407 room: Arc<live_kit_client::Room>,
1408 screen_track: LocalTrack,
1409 microphone_track: LocalTrack,
1410 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1411 muted_by_user: bool,
1412 deafened: bool,
1413 speaking: bool,
1414 next_publish_id: usize,
1415 _maintain_room: Task<()>,
1416 _maintain_tracks: [Task<()>; 2],
1417}
1418
1419impl LiveKitRoom {
1420 fn set_mute(
1421 self: &mut LiveKitRoom,
1422 should_mute: bool,
1423 cx: &mut ModelContext<Room>,
1424 ) -> Result<(Task<Result<()>>, bool)> {
1425 if !should_mute {
1426 // clear user muting state.
1427 self.muted_by_user = false;
1428 }
1429
1430 let (result, old_muted) = match &mut self.microphone_track {
1431 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1432 LocalTrack::Pending { muted, .. } => {
1433 let old_muted = *muted;
1434 *muted = should_mute;
1435 cx.notify();
1436 Ok((Task::Ready(Some(Ok(()))), old_muted))
1437 }
1438 LocalTrack::Published {
1439 track_publication,
1440 muted,
1441 } => {
1442 let old_muted = *muted;
1443 *muted = should_mute;
1444 cx.notify();
1445 Ok((
1446 cx.background().spawn(track_publication.set_mute(*muted)),
1447 old_muted,
1448 ))
1449 }
1450 }?;
1451
1452 if old_muted != should_mute {
1453 if should_mute {
1454 Audio::play_sound(Sound::Mute, cx);
1455 } else {
1456 Audio::play_sound(Sound::Unmute, cx);
1457 }
1458 }
1459
1460 Ok((result, old_muted))
1461 }
1462}
1463
1464enum LocalTrack {
1465 None,
1466 Pending {
1467 publish_id: usize,
1468 muted: bool,
1469 },
1470 Published {
1471 track_publication: LocalTrackPublication,
1472 muted: bool,
1473 },
1474}
1475
1476impl Default for LocalTrack {
1477 fn default() -> Self {
1478 Self::None
1479 }
1480}
1481
1482#[derive(Copy, Clone, PartialEq, Eq)]
1483pub enum RoomStatus {
1484 Online,
1485 Rejoining,
1486 Offline,
1487}
1488
1489impl RoomStatus {
1490 pub fn is_offline(&self) -> bool {
1491 matches!(self, RoomStatus::Offline)
1492 }
1493
1494 pub fn is_online(&self) -> bool {
1495 matches!(self, RoomStatus::Online)
1496 }
1497}