1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
4 IncomingCall,
5};
6use anyhow::{anyhow, Result};
7use audio::{Audio, Sound};
8use client::{
9 proto::{self, PeerId},
10 Client, TypedEnvelope, User, UserStore,
11};
12use collections::{BTreeMap, HashMap, HashSet};
13use fs::Fs;
14use futures::{FutureExt, StreamExt};
15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
16use language::LanguageRegistry;
17use live_kit_client::{
18 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
19 RemoteVideoTrackUpdate,
20};
21use postage::stream::Stream;
22use project::Project;
23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
24use util::{post_inc, ResultExt, TryFutureExt};
25
26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
27
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum Event {
30 ParticipantLocationChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteVideoTracksChanged {
34 participant_id: proto::PeerId,
35 },
36 RemoteAudioTracksChanged {
37 participant_id: proto::PeerId,
38 },
39 RemoteProjectShared {
40 owner: Arc<User>,
41 project_id: u64,
42 worktree_root_names: Vec<String>,
43 },
44 RemoteProjectUnshared {
45 project_id: u64,
46 },
47 Left,
48}
49
50pub struct Room {
51 id: u64,
52 channel_id: Option<u64>,
53 live_kit: Option<LiveKitRoom>,
54 status: RoomStatus,
55 shared_projects: HashSet<WeakModelHandle<Project>>,
56 joined_projects: HashSet<WeakModelHandle<Project>>,
57 local_participant: LocalParticipant,
58 remote_participants: BTreeMap<u64, RemoteParticipant>,
59 pending_participants: Vec<Arc<User>>,
60 participant_user_ids: HashSet<u64>,
61 pending_call_count: usize,
62 leave_when_empty: bool,
63 client: Arc<Client>,
64 user_store: ModelHandle<UserStore>,
65 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
66 subscriptions: Vec<client::Subscription>,
67 pending_room_update: Option<Task<()>>,
68 maintain_connection: Option<Task<Option<()>>>,
69}
70
71impl Entity for Room {
72 type Event = Event;
73
74 fn release(&mut self, cx: &mut AppContext) {
75 if self.status.is_online() {
76 self.leave_internal(cx).detach_and_log_err(cx);
77 }
78 }
79
80 fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
81 if self.status.is_online() {
82 let leave = self.leave_internal(cx);
83 Some(
84 cx.background()
85 .spawn(async move {
86 leave.await.log_err();
87 })
88 .boxed(),
89 )
90 } else {
91 None
92 }
93 }
94}
95
96impl Room {
97 pub fn channel_id(&self) -> Option<u64> {
98 self.channel_id
99 }
100
101 #[cfg(any(test, feature = "test-support"))]
102 pub fn is_connected(&self) -> bool {
103 if let Some(live_kit) = self.live_kit.as_ref() {
104 matches!(
105 *live_kit.room.status().borrow(),
106 live_kit_client::ConnectionState::Connected { .. }
107 )
108 } else {
109 false
110 }
111 }
112
113 fn new(
114 id: u64,
115 channel_id: Option<u64>,
116 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
117 client: Arc<Client>,
118 user_store: ModelHandle<UserStore>,
119 cx: &mut ModelContext<Self>,
120 ) -> Self {
121 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
122 let room = live_kit_client::Room::new();
123 let mut status = room.status();
124 // Consume the initial status of the room.
125 let _ = status.try_recv();
126 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
127 while let Some(status) = status.next().await {
128 let this = if let Some(this) = this.upgrade(&cx) {
129 this
130 } else {
131 break;
132 };
133
134 if status == live_kit_client::ConnectionState::Disconnected {
135 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
136 break;
137 }
138 }
139 });
140
141 let mut track_video_changes = room.remote_video_track_updates();
142 let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
143 while let Some(track_change) = track_video_changes.next().await {
144 let this = if let Some(this) = this.upgrade(&cx) {
145 this
146 } else {
147 break;
148 };
149
150 this.update(&mut cx, |this, cx| {
151 this.remote_video_track_updated(track_change, cx).log_err()
152 });
153 }
154 });
155
156 let mut track_audio_changes = room.remote_audio_track_updates();
157 let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
158 while let Some(track_change) = track_audio_changes.next().await {
159 let this = if let Some(this) = this.upgrade(&cx) {
160 this
161 } else {
162 break;
163 };
164
165 this.update(&mut cx, |this, cx| {
166 this.remote_audio_track_updated(track_change, cx).log_err()
167 });
168 }
169 });
170
171 let connect = room.connect(&connection_info.server_url, &connection_info.token);
172 cx.spawn(|this, mut cx| async move {
173 connect.await?;
174
175 if !cx.read(|cx| settings::get::<CallSettings>(cx).mute_on_join) {
176 this.update(&mut cx, |this, cx| this.share_microphone(cx))
177 .await?;
178 }
179
180 anyhow::Ok(())
181 })
182 .detach_and_log_err(cx);
183
184 Some(LiveKitRoom {
185 room,
186 screen_track: LocalTrack::None,
187 microphone_track: LocalTrack::None,
188 next_publish_id: 0,
189 muted_by_user: false,
190 deafened: false,
191 speaking: false,
192 _maintain_room,
193 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
194 })
195 } else {
196 None
197 };
198
199 let maintain_connection =
200 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
201
202 Audio::play_sound(Sound::Joined, cx);
203
204 Self {
205 id,
206 channel_id,
207 live_kit: live_kit_room,
208 status: RoomStatus::Online,
209 shared_projects: Default::default(),
210 joined_projects: Default::default(),
211 participant_user_ids: Default::default(),
212 local_participant: Default::default(),
213 remote_participants: Default::default(),
214 pending_participants: Default::default(),
215 pending_call_count: 0,
216 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
217 leave_when_empty: false,
218 pending_room_update: None,
219 client,
220 user_store,
221 follows_by_leader_id_project_id: Default::default(),
222 maintain_connection: Some(maintain_connection),
223 }
224 }
225
226 pub(crate) fn create(
227 called_user_id: u64,
228 initial_project: Option<ModelHandle<Project>>,
229 client: Arc<Client>,
230 user_store: ModelHandle<UserStore>,
231 cx: &mut AppContext,
232 ) -> Task<Result<ModelHandle<Self>>> {
233 cx.spawn(|mut cx| async move {
234 let response = client.request(proto::CreateRoom {}).await?;
235 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
236 let room = cx.add_model(|cx| {
237 Self::new(
238 room_proto.id,
239 None,
240 response.live_kit_connection_info,
241 client,
242 user_store,
243 cx,
244 )
245 });
246
247 let initial_project_id = if let Some(initial_project) = initial_project {
248 let initial_project_id = room
249 .update(&mut cx, |room, cx| {
250 room.share_project(initial_project.clone(), cx)
251 })
252 .await?;
253 Some(initial_project_id)
254 } else {
255 None
256 };
257
258 match room
259 .update(&mut cx, |room, cx| {
260 room.leave_when_empty = true;
261 room.call(called_user_id, initial_project_id, cx)
262 })
263 .await
264 {
265 Ok(()) => Ok(room),
266 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
267 }
268 })
269 }
270
271 pub(crate) fn join_channel(
272 channel_id: u64,
273 client: Arc<Client>,
274 user_store: ModelHandle<UserStore>,
275 cx: &mut AppContext,
276 ) -> Task<Result<ModelHandle<Self>>> {
277 cx.spawn(|cx| async move {
278 Self::from_join_response(
279 client.request(proto::JoinChannel { channel_id }).await?,
280 client,
281 user_store,
282 cx,
283 )
284 })
285 }
286
287 pub(crate) fn join(
288 call: &IncomingCall,
289 client: Arc<Client>,
290 user_store: ModelHandle<UserStore>,
291 cx: &mut AppContext,
292 ) -> Task<Result<ModelHandle<Self>>> {
293 let id = call.room_id;
294 cx.spawn(|cx| async move {
295 Self::from_join_response(
296 client.request(proto::JoinRoom { id }).await?,
297 client,
298 user_store,
299 cx,
300 )
301 })
302 }
303
304 fn from_join_response(
305 response: proto::JoinRoomResponse,
306 client: Arc<Client>,
307 user_store: ModelHandle<UserStore>,
308 mut cx: AsyncAppContext,
309 ) -> Result<ModelHandle<Self>> {
310 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
311 let room = cx.add_model(|cx| {
312 Self::new(
313 room_proto.id,
314 response.channel_id,
315 response.live_kit_connection_info,
316 client,
317 user_store,
318 cx,
319 )
320 });
321 room.update(&mut cx, |room, cx| {
322 room.leave_when_empty = room.channel_id.is_none();
323 room.apply_room_update(room_proto, cx)?;
324 anyhow::Ok(())
325 })?;
326 Ok(room)
327 }
328
329 fn should_leave(&self) -> bool {
330 self.leave_when_empty
331 && self.pending_room_update.is_none()
332 && self.pending_participants.is_empty()
333 && self.remote_participants.is_empty()
334 && self.pending_call_count == 0
335 }
336
337 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
338 cx.notify();
339 cx.emit(Event::Left);
340 self.leave_internal(cx)
341 }
342
343 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
344 if self.status.is_offline() {
345 return Task::ready(Err(anyhow!("room is offline")));
346 }
347
348 log::info!("leaving room");
349 Audio::play_sound(Sound::Leave, cx);
350
351 self.clear_state(cx);
352
353 let leave_room = self.client.request(proto::LeaveRoom {});
354 cx.background().spawn(async move {
355 leave_room.await?;
356 anyhow::Ok(())
357 })
358 }
359
360 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
361 for project in self.shared_projects.drain() {
362 if let Some(project) = project.upgrade(cx) {
363 project.update(cx, |project, cx| {
364 project.unshare(cx).log_err();
365 });
366 }
367 }
368 for project in self.joined_projects.drain() {
369 if let Some(project) = project.upgrade(cx) {
370 project.update(cx, |project, cx| {
371 project.disconnected_from_host(cx);
372 project.close(cx);
373 });
374 }
375 }
376
377 self.status = RoomStatus::Offline;
378 self.remote_participants.clear();
379 self.pending_participants.clear();
380 self.participant_user_ids.clear();
381 self.subscriptions.clear();
382 self.live_kit.take();
383 self.pending_room_update.take();
384 self.maintain_connection.take();
385 }
386
387 async fn maintain_connection(
388 this: WeakModelHandle<Self>,
389 client: Arc<Client>,
390 mut cx: AsyncAppContext,
391 ) -> Result<()> {
392 let mut client_status = client.status();
393 loop {
394 let _ = client_status.try_recv();
395 let is_connected = client_status.borrow().is_connected();
396 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
397 if !is_connected || client_status.next().await.is_some() {
398 log::info!("detected client disconnection");
399
400 this.upgrade(&cx)
401 .ok_or_else(|| anyhow!("room was dropped"))?
402 .update(&mut cx, |this, cx| {
403 this.status = RoomStatus::Rejoining;
404 cx.notify();
405 });
406
407 // Wait for client to re-establish a connection to the server.
408 {
409 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
410 let client_reconnection = async {
411 let mut remaining_attempts = 3;
412 while remaining_attempts > 0 {
413 if client_status.borrow().is_connected() {
414 log::info!("client reconnected, attempting to rejoin room");
415
416 let Some(this) = this.upgrade(&cx) else { break };
417 if this
418 .update(&mut cx, |this, cx| this.rejoin(cx))
419 .await
420 .log_err()
421 .is_some()
422 {
423 return true;
424 } else {
425 remaining_attempts -= 1;
426 }
427 } else if client_status.borrow().is_signed_out() {
428 return false;
429 }
430
431 log::info!(
432 "waiting for client status change, remaining attempts {}",
433 remaining_attempts
434 );
435 client_status.next().await;
436 }
437 false
438 }
439 .fuse();
440 futures::pin_mut!(client_reconnection);
441
442 futures::select_biased! {
443 reconnected = client_reconnection => {
444 if reconnected {
445 log::info!("successfully reconnected to room");
446 // If we successfully joined the room, go back around the loop
447 // waiting for future connection status changes.
448 continue;
449 }
450 }
451 _ = reconnection_timeout => {
452 log::info!("room reconnection timeout expired");
453 }
454 }
455 }
456
457 break;
458 }
459 }
460
461 // The client failed to re-establish a connection to the server
462 // or an error occurred while trying to re-join the room. Either way
463 // we leave the room and return an error.
464 if let Some(this) = this.upgrade(&cx) {
465 log::info!("reconnection failed, leaving room");
466 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
467 }
468 Err(anyhow!(
469 "can't reconnect to room: client failed to re-establish connection"
470 ))
471 }
472
473 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
474 let mut projects = HashMap::default();
475 let mut reshared_projects = Vec::new();
476 let mut rejoined_projects = Vec::new();
477 self.shared_projects.retain(|project| {
478 if let Some(handle) = project.upgrade(cx) {
479 let project = handle.read(cx);
480 if let Some(project_id) = project.remote_id() {
481 projects.insert(project_id, handle.clone());
482 reshared_projects.push(proto::UpdateProject {
483 project_id,
484 worktrees: project.worktree_metadata_protos(cx),
485 });
486 return true;
487 }
488 }
489 false
490 });
491 self.joined_projects.retain(|project| {
492 if let Some(handle) = project.upgrade(cx) {
493 let project = handle.read(cx);
494 if let Some(project_id) = project.remote_id() {
495 projects.insert(project_id, handle.clone());
496 rejoined_projects.push(proto::RejoinProject {
497 id: project_id,
498 worktrees: project
499 .worktrees(cx)
500 .map(|worktree| {
501 let worktree = worktree.read(cx);
502 proto::RejoinWorktree {
503 id: worktree.id().to_proto(),
504 scan_id: worktree.completed_scan_id() as u64,
505 }
506 })
507 .collect(),
508 });
509 }
510 return true;
511 }
512 false
513 });
514
515 let response = self.client.request_envelope(proto::RejoinRoom {
516 id: self.id,
517 reshared_projects,
518 rejoined_projects,
519 });
520
521 cx.spawn(|this, mut cx| async move {
522 let response = response.await?;
523 let message_id = response.message_id;
524 let response = response.payload;
525 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
526 this.update(&mut cx, |this, cx| {
527 this.status = RoomStatus::Online;
528 this.apply_room_update(room_proto, cx)?;
529
530 for reshared_project in response.reshared_projects {
531 if let Some(project) = projects.get(&reshared_project.id) {
532 project.update(cx, |project, cx| {
533 project.reshared(reshared_project, cx).log_err();
534 });
535 }
536 }
537
538 for rejoined_project in response.rejoined_projects {
539 if let Some(project) = projects.get(&rejoined_project.id) {
540 project.update(cx, |project, cx| {
541 project.rejoined(rejoined_project, message_id, cx).log_err();
542 });
543 }
544 }
545
546 anyhow::Ok(())
547 })
548 })
549 }
550
551 pub fn id(&self) -> u64 {
552 self.id
553 }
554
555 pub fn status(&self) -> RoomStatus {
556 self.status
557 }
558
559 pub fn local_participant(&self) -> &LocalParticipant {
560 &self.local_participant
561 }
562
563 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
564 &self.remote_participants
565 }
566
567 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
568 self.remote_participants
569 .values()
570 .find(|p| p.peer_id == peer_id)
571 }
572
573 pub fn pending_participants(&self) -> &[Arc<User>] {
574 &self.pending_participants
575 }
576
577 pub fn contains_participant(&self, user_id: u64) -> bool {
578 self.participant_user_ids.contains(&user_id)
579 }
580
581 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
582 self.follows_by_leader_id_project_id
583 .get(&(leader_id, project_id))
584 .map_or(&[], |v| v.as_slice())
585 }
586
587 async fn handle_room_updated(
588 this: ModelHandle<Self>,
589 envelope: TypedEnvelope<proto::RoomUpdated>,
590 _: Arc<Client>,
591 mut cx: AsyncAppContext,
592 ) -> Result<()> {
593 let room = envelope
594 .payload
595 .room
596 .ok_or_else(|| anyhow!("invalid room"))?;
597 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
598 }
599
600 fn apply_room_update(
601 &mut self,
602 mut room: proto::Room,
603 cx: &mut ModelContext<Self>,
604 ) -> Result<()> {
605 // Filter ourselves out from the room's participants.
606 let local_participant_ix = room
607 .participants
608 .iter()
609 .position(|participant| Some(participant.user_id) == self.client.user_id());
610 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
611
612 let pending_participant_user_ids = room
613 .pending_participants
614 .iter()
615 .map(|p| p.user_id)
616 .collect::<Vec<_>>();
617
618 let remote_participant_user_ids = room
619 .participants
620 .iter()
621 .map(|p| p.user_id)
622 .collect::<Vec<_>>();
623
624 let (remote_participants, pending_participants) =
625 self.user_store.update(cx, move |user_store, cx| {
626 (
627 user_store.get_users(remote_participant_user_ids, cx),
628 user_store.get_users(pending_participant_user_ids, cx),
629 )
630 });
631
632 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
633 let (remote_participants, pending_participants) =
634 futures::join!(remote_participants, pending_participants);
635
636 this.update(&mut cx, |this, cx| {
637 this.participant_user_ids.clear();
638
639 if let Some(participant) = local_participant {
640 this.local_participant.projects = participant.projects;
641 } else {
642 this.local_participant.projects.clear();
643 }
644
645 if let Some(participants) = remote_participants.log_err() {
646 for (participant, user) in room.participants.into_iter().zip(participants) {
647 let Some(peer_id) = participant.peer_id else {
648 continue;
649 };
650 this.participant_user_ids.insert(participant.user_id);
651
652 let old_projects = this
653 .remote_participants
654 .get(&participant.user_id)
655 .into_iter()
656 .flat_map(|existing| &existing.projects)
657 .map(|project| project.id)
658 .collect::<HashSet<_>>();
659 let new_projects = participant
660 .projects
661 .iter()
662 .map(|project| project.id)
663 .collect::<HashSet<_>>();
664
665 for project in &participant.projects {
666 if !old_projects.contains(&project.id) {
667 cx.emit(Event::RemoteProjectShared {
668 owner: user.clone(),
669 project_id: project.id,
670 worktree_root_names: project.worktree_root_names.clone(),
671 });
672 }
673 }
674
675 for unshared_project_id in old_projects.difference(&new_projects) {
676 this.joined_projects.retain(|project| {
677 if let Some(project) = project.upgrade(cx) {
678 project.update(cx, |project, cx| {
679 if project.remote_id() == Some(*unshared_project_id) {
680 project.disconnected_from_host(cx);
681 false
682 } else {
683 true
684 }
685 })
686 } else {
687 false
688 }
689 });
690 cx.emit(Event::RemoteProjectUnshared {
691 project_id: *unshared_project_id,
692 });
693 }
694
695 let location = ParticipantLocation::from_proto(participant.location)
696 .unwrap_or(ParticipantLocation::External);
697 if let Some(remote_participant) =
698 this.remote_participants.get_mut(&participant.user_id)
699 {
700 remote_participant.projects = participant.projects;
701 remote_participant.peer_id = peer_id;
702 if location != remote_participant.location {
703 remote_participant.location = location;
704 cx.emit(Event::ParticipantLocationChanged {
705 participant_id: peer_id,
706 });
707 }
708 } else {
709 this.remote_participants.insert(
710 participant.user_id,
711 RemoteParticipant {
712 user: user.clone(),
713 peer_id,
714 projects: participant.projects,
715 location,
716 muted: true,
717 speaking: false,
718 video_tracks: Default::default(),
719 audio_tracks: Default::default(),
720 },
721 );
722
723 Audio::play_sound(Sound::Joined, cx);
724
725 if let Some(live_kit) = this.live_kit.as_ref() {
726 let video_tracks =
727 live_kit.room.remote_video_tracks(&user.id.to_string());
728 let audio_tracks =
729 live_kit.room.remote_audio_tracks(&user.id.to_string());
730 let publications = live_kit
731 .room
732 .remote_audio_track_publications(&user.id.to_string());
733
734 for track in video_tracks {
735 this.remote_video_track_updated(
736 RemoteVideoTrackUpdate::Subscribed(track),
737 cx,
738 )
739 .log_err();
740 }
741
742 for (track, publication) in
743 audio_tracks.iter().zip(publications.iter())
744 {
745 this.remote_audio_track_updated(
746 RemoteAudioTrackUpdate::Subscribed(
747 track.clone(),
748 publication.clone(),
749 ),
750 cx,
751 )
752 .log_err();
753 }
754 }
755 }
756 }
757
758 this.remote_participants.retain(|user_id, participant| {
759 if this.participant_user_ids.contains(user_id) {
760 true
761 } else {
762 for project in &participant.projects {
763 cx.emit(Event::RemoteProjectUnshared {
764 project_id: project.id,
765 });
766 }
767 false
768 }
769 });
770 }
771
772 if let Some(pending_participants) = pending_participants.log_err() {
773 this.pending_participants = pending_participants;
774 for participant in &this.pending_participants {
775 this.participant_user_ids.insert(participant.id);
776 }
777 }
778
779 this.follows_by_leader_id_project_id.clear();
780 for follower in room.followers {
781 let project_id = follower.project_id;
782 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
783 (Some(leader), Some(follower)) => (leader, follower),
784
785 _ => {
786 log::error!("Follower message {follower:?} missing some state");
787 continue;
788 }
789 };
790
791 let list = this
792 .follows_by_leader_id_project_id
793 .entry((leader, project_id))
794 .or_insert(Vec::new());
795 if !list.contains(&follower) {
796 list.push(follower);
797 }
798 }
799
800 this.pending_room_update.take();
801 if this.should_leave() {
802 log::info!("room is empty, leaving");
803 let _ = this.leave(cx);
804 }
805
806 this.check_invariants();
807 cx.notify();
808 });
809 }));
810
811 cx.notify();
812 Ok(())
813 }
814
815 fn remote_video_track_updated(
816 &mut self,
817 change: RemoteVideoTrackUpdate,
818 cx: &mut ModelContext<Self>,
819 ) -> Result<()> {
820 match change {
821 RemoteVideoTrackUpdate::Subscribed(track) => {
822 let user_id = track.publisher_id().parse()?;
823 let track_id = track.sid().to_string();
824 let participant = self
825 .remote_participants
826 .get_mut(&user_id)
827 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
828 participant.video_tracks.insert(
829 track_id.clone(),
830 Arc::new(RemoteVideoTrack {
831 live_kit_track: track,
832 }),
833 );
834 cx.emit(Event::RemoteVideoTracksChanged {
835 participant_id: participant.peer_id,
836 });
837 }
838 RemoteVideoTrackUpdate::Unsubscribed {
839 publisher_id,
840 track_id,
841 } => {
842 let user_id = publisher_id.parse()?;
843 let participant = self
844 .remote_participants
845 .get_mut(&user_id)
846 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
847 participant.video_tracks.remove(&track_id);
848 cx.emit(Event::RemoteVideoTracksChanged {
849 participant_id: participant.peer_id,
850 });
851 }
852 }
853
854 cx.notify();
855 Ok(())
856 }
857
858 fn remote_audio_track_updated(
859 &mut self,
860 change: RemoteAudioTrackUpdate,
861 cx: &mut ModelContext<Self>,
862 ) -> Result<()> {
863 match change {
864 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
865 let mut speaker_ids = speakers
866 .into_iter()
867 .filter_map(|speaker_sid| speaker_sid.parse().ok())
868 .collect::<Vec<u64>>();
869 speaker_ids.sort_unstable();
870 for (sid, participant) in &mut self.remote_participants {
871 if let Ok(_) = speaker_ids.binary_search(sid) {
872 participant.speaking = true;
873 } else {
874 participant.speaking = false;
875 }
876 }
877 if let Some(id) = self.client.user_id() {
878 if let Some(room) = &mut self.live_kit {
879 if let Ok(_) = speaker_ids.binary_search(&id) {
880 room.speaking = true;
881 } else {
882 room.speaking = false;
883 }
884 }
885 }
886 cx.notify();
887 }
888 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
889 let mut found = false;
890 for participant in &mut self.remote_participants.values_mut() {
891 for track in participant.audio_tracks.values() {
892 if track.sid() == track_id {
893 found = true;
894 break;
895 }
896 }
897 if found {
898 participant.muted = muted;
899 break;
900 }
901 }
902
903 cx.notify();
904 }
905 RemoteAudioTrackUpdate::Subscribed(track, publication) => {
906 let user_id = track.publisher_id().parse()?;
907 let track_id = track.sid().to_string();
908 let participant = self
909 .remote_participants
910 .get_mut(&user_id)
911 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
912
913 participant.audio_tracks.insert(track_id.clone(), track);
914 participant.muted = publication.is_muted();
915
916 cx.emit(Event::RemoteAudioTracksChanged {
917 participant_id: participant.peer_id,
918 });
919 }
920 RemoteAudioTrackUpdate::Unsubscribed {
921 publisher_id,
922 track_id,
923 } => {
924 let user_id = publisher_id.parse()?;
925 let participant = self
926 .remote_participants
927 .get_mut(&user_id)
928 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
929 participant.audio_tracks.remove(&track_id);
930 cx.emit(Event::RemoteAudioTracksChanged {
931 participant_id: participant.peer_id,
932 });
933 }
934 }
935
936 cx.notify();
937 Ok(())
938 }
939
940 fn check_invariants(&self) {
941 #[cfg(any(test, feature = "test-support"))]
942 {
943 for participant in self.remote_participants.values() {
944 assert!(self.participant_user_ids.contains(&participant.user.id));
945 assert_ne!(participant.user.id, self.client.user_id().unwrap());
946 }
947
948 for participant in &self.pending_participants {
949 assert!(self.participant_user_ids.contains(&participant.id));
950 assert_ne!(participant.id, self.client.user_id().unwrap());
951 }
952
953 assert_eq!(
954 self.participant_user_ids.len(),
955 self.remote_participants.len() + self.pending_participants.len()
956 );
957 }
958 }
959
960 pub(crate) fn call(
961 &mut self,
962 called_user_id: u64,
963 initial_project_id: Option<u64>,
964 cx: &mut ModelContext<Self>,
965 ) -> Task<Result<()>> {
966 if self.status.is_offline() {
967 return Task::ready(Err(anyhow!("room is offline")));
968 }
969
970 cx.notify();
971 let client = self.client.clone();
972 let room_id = self.id;
973 self.pending_call_count += 1;
974 cx.spawn(|this, mut cx| async move {
975 let result = client
976 .request(proto::Call {
977 room_id,
978 called_user_id,
979 initial_project_id,
980 })
981 .await;
982 this.update(&mut cx, |this, cx| {
983 this.pending_call_count -= 1;
984 if this.should_leave() {
985 this.leave(cx).detach_and_log_err(cx);
986 }
987 });
988 result?;
989 Ok(())
990 })
991 }
992
993 pub fn join_project(
994 &mut self,
995 id: u64,
996 language_registry: Arc<LanguageRegistry>,
997 fs: Arc<dyn Fs>,
998 cx: &mut ModelContext<Self>,
999 ) -> Task<Result<ModelHandle<Project>>> {
1000 let client = self.client.clone();
1001 let user_store = self.user_store.clone();
1002 cx.spawn(|this, mut cx| async move {
1003 let project =
1004 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1005
1006 this.update(&mut cx, |this, cx| {
1007 this.joined_projects.retain(|project| {
1008 if let Some(project) = project.upgrade(cx) {
1009 !project.read(cx).is_read_only()
1010 } else {
1011 false
1012 }
1013 });
1014 this.joined_projects.insert(project.downgrade());
1015 });
1016 Ok(project)
1017 })
1018 }
1019
1020 pub(crate) fn share_project(
1021 &mut self,
1022 project: ModelHandle<Project>,
1023 cx: &mut ModelContext<Self>,
1024 ) -> Task<Result<u64>> {
1025 if let Some(project_id) = project.read(cx).remote_id() {
1026 return Task::ready(Ok(project_id));
1027 }
1028
1029 let request = self.client.request(proto::ShareProject {
1030 room_id: self.id(),
1031 worktrees: project.read(cx).worktree_metadata_protos(cx),
1032 });
1033 cx.spawn(|this, mut cx| async move {
1034 let response = request.await?;
1035
1036 project.update(&mut cx, |project, cx| {
1037 project.shared(response.project_id, cx)
1038 })?;
1039
1040 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1041 this.update(&mut cx, |this, cx| {
1042 this.shared_projects.insert(project.downgrade());
1043 let active_project = this.local_participant.active_project.as_ref();
1044 if active_project.map_or(false, |location| *location == project) {
1045 this.set_location(Some(&project), cx)
1046 } else {
1047 Task::ready(Ok(()))
1048 }
1049 })
1050 .await?;
1051
1052 Ok(response.project_id)
1053 })
1054 }
1055
1056 pub(crate) fn unshare_project(
1057 &mut self,
1058 project: ModelHandle<Project>,
1059 cx: &mut ModelContext<Self>,
1060 ) -> Result<()> {
1061 let project_id = match project.read(cx).remote_id() {
1062 Some(project_id) => project_id,
1063 None => return Ok(()),
1064 };
1065
1066 self.client.send(proto::UnshareProject { project_id })?;
1067 project.update(cx, |this, cx| this.unshare(cx))
1068 }
1069
1070 pub(crate) fn set_location(
1071 &mut self,
1072 project: Option<&ModelHandle<Project>>,
1073 cx: &mut ModelContext<Self>,
1074 ) -> Task<Result<()>> {
1075 if self.status.is_offline() {
1076 return Task::ready(Err(anyhow!("room is offline")));
1077 }
1078
1079 let client = self.client.clone();
1080 let room_id = self.id;
1081 let location = if let Some(project) = project {
1082 self.local_participant.active_project = Some(project.downgrade());
1083 if let Some(project_id) = project.read(cx).remote_id() {
1084 proto::participant_location::Variant::SharedProject(
1085 proto::participant_location::SharedProject { id: project_id },
1086 )
1087 } else {
1088 proto::participant_location::Variant::UnsharedProject(
1089 proto::participant_location::UnsharedProject {},
1090 )
1091 }
1092 } else {
1093 self.local_participant.active_project = None;
1094 proto::participant_location::Variant::External(proto::participant_location::External {})
1095 };
1096
1097 cx.notify();
1098 cx.foreground().spawn(async move {
1099 client
1100 .request(proto::UpdateParticipantLocation {
1101 room_id,
1102 location: Some(proto::ParticipantLocation {
1103 variant: Some(location),
1104 }),
1105 })
1106 .await?;
1107 Ok(())
1108 })
1109 }
1110
1111 pub fn is_screen_sharing(&self) -> bool {
1112 self.live_kit.as_ref().map_or(false, |live_kit| {
1113 !matches!(live_kit.screen_track, LocalTrack::None)
1114 })
1115 }
1116
1117 pub fn is_sharing_mic(&self) -> bool {
1118 self.live_kit.as_ref().map_or(false, |live_kit| {
1119 !matches!(live_kit.microphone_track, LocalTrack::None)
1120 })
1121 }
1122
1123 pub fn is_muted(&self, cx: &AppContext) -> bool {
1124 self.live_kit
1125 .as_ref()
1126 .and_then(|live_kit| match &live_kit.microphone_track {
1127 LocalTrack::None => Some(settings::get::<CallSettings>(cx).mute_on_join),
1128 LocalTrack::Pending { muted, .. } => Some(*muted),
1129 LocalTrack::Published { muted, .. } => Some(*muted),
1130 })
1131 .unwrap_or(false)
1132 }
1133
1134 pub fn is_speaking(&self) -> bool {
1135 self.live_kit
1136 .as_ref()
1137 .map_or(false, |live_kit| live_kit.speaking)
1138 }
1139
1140 pub fn is_deafened(&self) -> Option<bool> {
1141 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1142 }
1143
1144 #[track_caller]
1145 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1146 if self.status.is_offline() {
1147 return Task::ready(Err(anyhow!("room is offline")));
1148 } else if self.is_sharing_mic() {
1149 return Task::ready(Err(anyhow!("microphone was already shared")));
1150 }
1151
1152 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1153 let publish_id = post_inc(&mut live_kit.next_publish_id);
1154 live_kit.microphone_track = LocalTrack::Pending {
1155 publish_id,
1156 muted: false,
1157 };
1158 cx.notify();
1159 publish_id
1160 } else {
1161 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1162 };
1163
1164 cx.spawn_weak(|this, mut cx| async move {
1165 let publish_track = async {
1166 let track = LocalAudioTrack::create();
1167 this.upgrade(&cx)
1168 .ok_or_else(|| anyhow!("room was dropped"))?
1169 .read_with(&cx, |this, _| {
1170 this.live_kit
1171 .as_ref()
1172 .map(|live_kit| live_kit.room.publish_audio_track(&track))
1173 })
1174 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1175 .await
1176 };
1177
1178 let publication = publish_track.await;
1179 this.upgrade(&cx)
1180 .ok_or_else(|| anyhow!("room was dropped"))?
1181 .update(&mut cx, |this, cx| {
1182 let live_kit = this
1183 .live_kit
1184 .as_mut()
1185 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1186
1187 let (canceled, muted) = if let LocalTrack::Pending {
1188 publish_id: cur_publish_id,
1189 muted,
1190 } = &live_kit.microphone_track
1191 {
1192 (*cur_publish_id != publish_id, *muted)
1193 } else {
1194 (true, false)
1195 };
1196
1197 match publication {
1198 Ok(publication) => {
1199 if canceled {
1200 live_kit.room.unpublish_track(publication);
1201 } else {
1202 if muted {
1203 cx.background().spawn(publication.set_mute(muted)).detach();
1204 }
1205 live_kit.microphone_track = LocalTrack::Published {
1206 track_publication: publication,
1207 muted,
1208 };
1209 cx.notify();
1210 }
1211 Ok(())
1212 }
1213 Err(error) => {
1214 if canceled {
1215 Ok(())
1216 } else {
1217 live_kit.microphone_track = LocalTrack::None;
1218 cx.notify();
1219 Err(error)
1220 }
1221 }
1222 }
1223 })
1224 })
1225 }
1226
1227 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1228 if self.status.is_offline() {
1229 return Task::ready(Err(anyhow!("room is offline")));
1230 } else if self.is_screen_sharing() {
1231 return Task::ready(Err(anyhow!("screen was already shared")));
1232 }
1233
1234 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1235 let publish_id = post_inc(&mut live_kit.next_publish_id);
1236 live_kit.screen_track = LocalTrack::Pending {
1237 publish_id,
1238 muted: false,
1239 };
1240 cx.notify();
1241 (live_kit.room.display_sources(), publish_id)
1242 } else {
1243 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1244 };
1245
1246 cx.spawn_weak(|this, mut cx| async move {
1247 let publish_track = async {
1248 let displays = displays.await?;
1249 let display = displays
1250 .first()
1251 .ok_or_else(|| anyhow!("no display found"))?;
1252 let track = LocalVideoTrack::screen_share_for_display(&display);
1253 this.upgrade(&cx)
1254 .ok_or_else(|| anyhow!("room was dropped"))?
1255 .read_with(&cx, |this, _| {
1256 this.live_kit
1257 .as_ref()
1258 .map(|live_kit| live_kit.room.publish_video_track(&track))
1259 })
1260 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1261 .await
1262 };
1263
1264 let publication = publish_track.await;
1265 this.upgrade(&cx)
1266 .ok_or_else(|| anyhow!("room was dropped"))?
1267 .update(&mut cx, |this, cx| {
1268 let live_kit = this
1269 .live_kit
1270 .as_mut()
1271 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1272
1273 let (canceled, muted) = if let LocalTrack::Pending {
1274 publish_id: cur_publish_id,
1275 muted,
1276 } = &live_kit.screen_track
1277 {
1278 (*cur_publish_id != publish_id, *muted)
1279 } else {
1280 (true, false)
1281 };
1282
1283 match publication {
1284 Ok(publication) => {
1285 if canceled {
1286 live_kit.room.unpublish_track(publication);
1287 } else {
1288 if muted {
1289 cx.background().spawn(publication.set_mute(muted)).detach();
1290 }
1291 live_kit.screen_track = LocalTrack::Published {
1292 track_publication: publication,
1293 muted,
1294 };
1295 cx.notify();
1296 }
1297
1298 Audio::play_sound(Sound::StartScreenshare, cx);
1299
1300 Ok(())
1301 }
1302 Err(error) => {
1303 if canceled {
1304 Ok(())
1305 } else {
1306 live_kit.screen_track = LocalTrack::None;
1307 cx.notify();
1308 Err(error)
1309 }
1310 }
1311 }
1312 })
1313 })
1314 }
1315
1316 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1317 let should_mute = !self.is_muted(cx);
1318 if let Some(live_kit) = self.live_kit.as_mut() {
1319 if matches!(live_kit.microphone_track, LocalTrack::None) {
1320 return Ok(self.share_microphone(cx));
1321 }
1322
1323 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1324 live_kit.muted_by_user = should_mute;
1325
1326 if old_muted == true && live_kit.deafened == true {
1327 if let Some(task) = self.toggle_deafen(cx).ok() {
1328 task.detach();
1329 }
1330 }
1331
1332 Ok(ret_task)
1333 } else {
1334 Err(anyhow!("LiveKit not started"))
1335 }
1336 }
1337
1338 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1339 if let Some(live_kit) = self.live_kit.as_mut() {
1340 (*live_kit).deafened = !live_kit.deafened;
1341
1342 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1343 // Context notification is sent within set_mute itself.
1344 let mut mute_task = None;
1345 // When deafening, mute user's mic as well.
1346 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1347 if live_kit.deafened || !live_kit.muted_by_user {
1348 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1349 };
1350 for participant in self.remote_participants.values() {
1351 for track in live_kit
1352 .room
1353 .remote_audio_track_publications(&participant.user.id.to_string())
1354 {
1355 tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1356 }
1357 }
1358
1359 Ok(cx.foreground().spawn(async move {
1360 if let Some(mute_task) = mute_task {
1361 mute_task.await?;
1362 }
1363 for task in tasks {
1364 task.await?;
1365 }
1366 Ok(())
1367 }))
1368 } else {
1369 Err(anyhow!("LiveKit not started"))
1370 }
1371 }
1372
1373 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1374 if self.status.is_offline() {
1375 return Err(anyhow!("room is offline"));
1376 }
1377
1378 let live_kit = self
1379 .live_kit
1380 .as_mut()
1381 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1382 match mem::take(&mut live_kit.screen_track) {
1383 LocalTrack::None => Err(anyhow!("screen was not shared")),
1384 LocalTrack::Pending { .. } => {
1385 cx.notify();
1386 Ok(())
1387 }
1388 LocalTrack::Published {
1389 track_publication, ..
1390 } => {
1391 live_kit.room.unpublish_track(track_publication);
1392 cx.notify();
1393
1394 Audio::play_sound(Sound::StopScreenshare, cx);
1395 Ok(())
1396 }
1397 }
1398 }
1399
1400 #[cfg(any(test, feature = "test-support"))]
1401 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1402 self.live_kit
1403 .as_ref()
1404 .unwrap()
1405 .room
1406 .set_display_sources(sources);
1407 }
1408}
1409
1410struct LiveKitRoom {
1411 room: Arc<live_kit_client::Room>,
1412 screen_track: LocalTrack,
1413 microphone_track: LocalTrack,
1414 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1415 muted_by_user: bool,
1416 deafened: bool,
1417 speaking: bool,
1418 next_publish_id: usize,
1419 _maintain_room: Task<()>,
1420 _maintain_tracks: [Task<()>; 2],
1421}
1422
1423impl LiveKitRoom {
1424 fn set_mute(
1425 self: &mut LiveKitRoom,
1426 should_mute: bool,
1427 cx: &mut ModelContext<Room>,
1428 ) -> Result<(Task<Result<()>>, bool)> {
1429 if !should_mute {
1430 // clear user muting state.
1431 self.muted_by_user = false;
1432 }
1433
1434 let (result, old_muted) = match &mut self.microphone_track {
1435 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1436 LocalTrack::Pending { muted, .. } => {
1437 let old_muted = *muted;
1438 *muted = should_mute;
1439 cx.notify();
1440 Ok((Task::Ready(Some(Ok(()))), old_muted))
1441 }
1442 LocalTrack::Published {
1443 track_publication,
1444 muted,
1445 } => {
1446 let old_muted = *muted;
1447 *muted = should_mute;
1448 cx.notify();
1449 Ok((
1450 cx.background().spawn(track_publication.set_mute(*muted)),
1451 old_muted,
1452 ))
1453 }
1454 }?;
1455
1456 if old_muted != should_mute {
1457 if should_mute {
1458 Audio::play_sound(Sound::Mute, cx);
1459 } else {
1460 Audio::play_sound(Sound::Unmute, cx);
1461 }
1462 }
1463
1464 Ok((result, old_muted))
1465 }
1466}
1467
1468enum LocalTrack {
1469 None,
1470 Pending {
1471 publish_id: usize,
1472 muted: bool,
1473 },
1474 Published {
1475 track_publication: LocalTrackPublication,
1476 muted: bool,
1477 },
1478}
1479
1480impl Default for LocalTrack {
1481 fn default() -> Self {
1482 Self::None
1483 }
1484}
1485
1486#[derive(Copy, Clone, PartialEq, Eq)]
1487pub enum RoomStatus {
1488 Online,
1489 Rejoining,
1490 Offline,
1491}
1492
1493impl RoomStatus {
1494 pub fn is_offline(&self) -> bool {
1495 matches!(self, RoomStatus::Offline)
1496 }
1497
1498 pub fn is_online(&self) -> bool {
1499 matches!(self, RoomStatus::Online)
1500 }
1501}