1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{
7 proto::{self, PeerId},
8 Client, TypedEnvelope, User, UserStore,
9};
10use collections::{BTreeMap, HashMap, HashSet};
11use fs::Fs;
12use futures::{FutureExt, StreamExt};
13use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
14use language::LanguageRegistry;
15use live_kit_client::{
16 LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
17 RemoteVideoTrackUpdate,
18};
19use postage::stream::Stream;
20use project::Project;
21use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
22use util::{post_inc, ResultExt, TryFutureExt};
23
24pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
25
26#[derive(Clone, Debug, PartialEq, Eq)]
27pub enum Event {
28 ParticipantLocationChanged {
29 participant_id: proto::PeerId,
30 },
31 RemoteVideoTracksChanged {
32 participant_id: proto::PeerId,
33 },
34 RemoteAudioTracksChanged {
35 participant_id: proto::PeerId,
36 },
37 RemoteProjectShared {
38 owner: Arc<User>,
39 project_id: u64,
40 worktree_root_names: Vec<String>,
41 },
42 RemoteProjectUnshared {
43 project_id: u64,
44 },
45 Left,
46}
47
48pub struct Room {
49 id: u64,
50 live_kit: Option<LiveKitRoom>,
51 status: RoomStatus,
52 shared_projects: HashSet<WeakModelHandle<Project>>,
53 joined_projects: HashSet<WeakModelHandle<Project>>,
54 local_participant: LocalParticipant,
55 remote_participants: BTreeMap<u64, RemoteParticipant>,
56 pending_participants: Vec<Arc<User>>,
57 participant_user_ids: HashSet<u64>,
58 pending_call_count: usize,
59 leave_when_empty: bool,
60 client: Arc<Client>,
61 user_store: ModelHandle<UserStore>,
62 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
63 subscriptions: Vec<client::Subscription>,
64 pending_room_update: Option<Task<()>>,
65 maintain_connection: Option<Task<Option<()>>>,
66}
67
68impl Entity for Room {
69 type Event = Event;
70
71 fn release(&mut self, cx: &mut AppContext) {
72 if self.status.is_online() {
73 self.leave_internal(cx).detach_and_log_err(cx);
74 }
75 }
76
77 fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
78 if self.status.is_online() {
79 let leave = self.leave_internal(cx);
80 Some(
81 cx.background()
82 .spawn(async move {
83 leave.await.log_err();
84 })
85 .boxed(),
86 )
87 } else {
88 None
89 }
90 }
91}
92
93impl Room {
94 fn new(
95 id: u64,
96 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
97 client: Arc<Client>,
98 user_store: ModelHandle<UserStore>,
99 cx: &mut ModelContext<Self>,
100 ) -> Self {
101 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
102 let room = live_kit_client::Room::new();
103 let mut status = room.status();
104 // Consume the initial status of the room.
105 let _ = status.try_recv();
106 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
107 while let Some(status) = status.next().await {
108 let this = if let Some(this) = this.upgrade(&cx) {
109 this
110 } else {
111 break;
112 };
113
114 if status == live_kit_client::ConnectionState::Disconnected {
115 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
116 break;
117 }
118 }
119 });
120
121 let mut track_video_changes = room.remote_video_track_updates();
122 let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
123 while let Some(track_change) = track_video_changes.next().await {
124 let this = if let Some(this) = this.upgrade(&cx) {
125 this
126 } else {
127 break;
128 };
129
130 this.update(&mut cx, |this, cx| {
131 this.remote_video_track_updated(track_change, cx).log_err()
132 });
133 }
134 });
135
136 let mut track_audio_changes = room.remote_audio_track_updates();
137 let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
138 while let Some(track_change) = track_audio_changes.next().await {
139 let this = if let Some(this) = this.upgrade(&cx) {
140 this
141 } else {
142 break;
143 };
144
145 this.update(&mut cx, |this, cx| {
146 this.remote_audio_track_updated(track_change, cx).log_err()
147 });
148 }
149 });
150
151 let connect = room.connect(&connection_info.server_url, &connection_info.token);
152 cx.spawn(|this, mut cx| async move {
153 connect.await?;
154 this.update(&mut cx, |this, cx| this.share_microphone(cx))
155 .await?;
156
157 anyhow::Ok(())
158 })
159 .detach_and_log_err(cx);
160
161 Some(LiveKitRoom {
162 room,
163 screen_track: LocalTrack::None,
164 microphone_track: LocalTrack::None,
165 next_publish_id: 0,
166 muted_by_user: false,
167 deafened: false,
168 speaking: false,
169 _maintain_room,
170 _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
171 })
172 } else {
173 None
174 };
175
176 let maintain_connection =
177 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
178
179 Self {
180 id,
181 live_kit: live_kit_room,
182 status: RoomStatus::Online,
183 shared_projects: Default::default(),
184 joined_projects: Default::default(),
185 participant_user_ids: Default::default(),
186 local_participant: Default::default(),
187 remote_participants: Default::default(),
188 pending_participants: Default::default(),
189 pending_call_count: 0,
190 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
191 leave_when_empty: false,
192 pending_room_update: None,
193 client,
194 user_store,
195 follows_by_leader_id_project_id: Default::default(),
196 maintain_connection: Some(maintain_connection),
197 }
198 }
199
200 pub(crate) fn create(
201 called_user_id: u64,
202 initial_project: Option<ModelHandle<Project>>,
203 client: Arc<Client>,
204 user_store: ModelHandle<UserStore>,
205 cx: &mut AppContext,
206 ) -> Task<Result<ModelHandle<Self>>> {
207 cx.spawn(|mut cx| async move {
208 let response = client.request(proto::CreateRoom {}).await?;
209 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
210 let room = cx.add_model(|cx| {
211 Self::new(
212 room_proto.id,
213 response.live_kit_connection_info,
214 client,
215 user_store,
216 cx,
217 )
218 });
219
220 let initial_project_id = if let Some(initial_project) = initial_project {
221 let initial_project_id = room
222 .update(&mut cx, |room, cx| {
223 room.share_project(initial_project.clone(), cx)
224 })
225 .await?;
226 Some(initial_project_id)
227 } else {
228 None
229 };
230
231 match room
232 .update(&mut cx, |room, cx| {
233 room.leave_when_empty = true;
234 room.call(called_user_id, initial_project_id, cx)
235 })
236 .await
237 {
238 Ok(()) => Ok(room),
239 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
240 }
241 })
242 }
243
244 pub(crate) fn join(
245 call: &IncomingCall,
246 client: Arc<Client>,
247 user_store: ModelHandle<UserStore>,
248 cx: &mut AppContext,
249 ) -> Task<Result<ModelHandle<Self>>> {
250 let room_id = call.room_id;
251 cx.spawn(|mut cx| async move {
252 let response = client.request(proto::JoinRoom { id: room_id }).await?;
253 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
254 let room = cx.add_model(|cx| {
255 Self::new(
256 room_id,
257 response.live_kit_connection_info,
258 client,
259 user_store,
260 cx,
261 )
262 });
263 room.update(&mut cx, |room, cx| {
264 room.leave_when_empty = true;
265 room.apply_room_update(room_proto, cx)?;
266 anyhow::Ok(())
267 })?;
268 Ok(room)
269 })
270 }
271
272 fn should_leave(&self) -> bool {
273 self.leave_when_empty
274 && self.pending_room_update.is_none()
275 && self.pending_participants.is_empty()
276 && self.remote_participants.is_empty()
277 && self.pending_call_count == 0
278 }
279
280 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
281 cx.notify();
282 cx.emit(Event::Left);
283 self.leave_internal(cx)
284 }
285
286 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
287 if self.status.is_offline() {
288 return Task::ready(Err(anyhow!("room is offline")));
289 }
290
291 log::info!("leaving room");
292
293 for project in self.shared_projects.drain() {
294 if let Some(project) = project.upgrade(cx) {
295 project.update(cx, |project, cx| {
296 project.unshare(cx).log_err();
297 });
298 }
299 }
300 for project in self.joined_projects.drain() {
301 if let Some(project) = project.upgrade(cx) {
302 project.update(cx, |project, cx| {
303 project.disconnected_from_host(cx);
304 project.close(cx);
305 });
306 }
307 }
308
309 self.status = RoomStatus::Offline;
310 self.remote_participants.clear();
311 self.pending_participants.clear();
312 self.participant_user_ids.clear();
313 self.subscriptions.clear();
314 self.live_kit.take();
315 self.pending_room_update.take();
316 self.maintain_connection.take();
317
318 let leave_room = self.client.request(proto::LeaveRoom {});
319 cx.background().spawn(async move {
320 leave_room.await?;
321 anyhow::Ok(())
322 })
323 }
324
325 async fn maintain_connection(
326 this: WeakModelHandle<Self>,
327 client: Arc<Client>,
328 mut cx: AsyncAppContext,
329 ) -> Result<()> {
330 let mut client_status = client.status();
331 loop {
332 let _ = client_status.try_recv();
333 let is_connected = client_status.borrow().is_connected();
334 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
335 if !is_connected || client_status.next().await.is_some() {
336 log::info!("detected client disconnection");
337
338 this.upgrade(&cx)
339 .ok_or_else(|| anyhow!("room was dropped"))?
340 .update(&mut cx, |this, cx| {
341 this.status = RoomStatus::Rejoining;
342 cx.notify();
343 });
344
345 // Wait for client to re-establish a connection to the server.
346 {
347 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
348 let client_reconnection = async {
349 let mut remaining_attempts = 3;
350 while remaining_attempts > 0 {
351 if client_status.borrow().is_connected() {
352 log::info!("client reconnected, attempting to rejoin room");
353
354 let Some(this) = this.upgrade(&cx) else { break };
355 if this
356 .update(&mut cx, |this, cx| this.rejoin(cx))
357 .await
358 .log_err()
359 .is_some()
360 {
361 return true;
362 } else {
363 remaining_attempts -= 1;
364 }
365 } else if client_status.borrow().is_signed_out() {
366 return false;
367 }
368
369 log::info!(
370 "waiting for client status change, remaining attempts {}",
371 remaining_attempts
372 );
373 client_status.next().await;
374 }
375 false
376 }
377 .fuse();
378 futures::pin_mut!(client_reconnection);
379
380 futures::select_biased! {
381 reconnected = client_reconnection => {
382 if reconnected {
383 log::info!("successfully reconnected to room");
384 // If we successfully joined the room, go back around the loop
385 // waiting for future connection status changes.
386 continue;
387 }
388 }
389 _ = reconnection_timeout => {
390 log::info!("room reconnection timeout expired");
391 }
392 }
393 }
394
395 break;
396 }
397 }
398
399 // The client failed to re-establish a connection to the server
400 // or an error occurred while trying to re-join the room. Either way
401 // we leave the room and return an error.
402 if let Some(this) = this.upgrade(&cx) {
403 log::info!("reconnection failed, leaving room");
404 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
405 }
406 Err(anyhow!(
407 "can't reconnect to room: client failed to re-establish connection"
408 ))
409 }
410
411 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
412 let mut projects = HashMap::default();
413 let mut reshared_projects = Vec::new();
414 let mut rejoined_projects = Vec::new();
415 self.shared_projects.retain(|project| {
416 if let Some(handle) = project.upgrade(cx) {
417 let project = handle.read(cx);
418 if let Some(project_id) = project.remote_id() {
419 projects.insert(project_id, handle.clone());
420 reshared_projects.push(proto::UpdateProject {
421 project_id,
422 worktrees: project.worktree_metadata_protos(cx),
423 });
424 return true;
425 }
426 }
427 false
428 });
429 self.joined_projects.retain(|project| {
430 if let Some(handle) = project.upgrade(cx) {
431 let project = handle.read(cx);
432 if let Some(project_id) = project.remote_id() {
433 projects.insert(project_id, handle.clone());
434 rejoined_projects.push(proto::RejoinProject {
435 id: project_id,
436 worktrees: project
437 .worktrees(cx)
438 .map(|worktree| {
439 let worktree = worktree.read(cx);
440 proto::RejoinWorktree {
441 id: worktree.id().to_proto(),
442 scan_id: worktree.completed_scan_id() as u64,
443 }
444 })
445 .collect(),
446 });
447 }
448 return true;
449 }
450 false
451 });
452
453 let response = self.client.request_envelope(proto::RejoinRoom {
454 id: self.id,
455 reshared_projects,
456 rejoined_projects,
457 });
458
459 cx.spawn(|this, mut cx| async move {
460 let response = response.await?;
461 let message_id = response.message_id;
462 let response = response.payload;
463 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
464 this.update(&mut cx, |this, cx| {
465 this.status = RoomStatus::Online;
466 this.apply_room_update(room_proto, cx)?;
467
468 for reshared_project in response.reshared_projects {
469 if let Some(project) = projects.get(&reshared_project.id) {
470 project.update(cx, |project, cx| {
471 project.reshared(reshared_project, cx).log_err();
472 });
473 }
474 }
475
476 for rejoined_project in response.rejoined_projects {
477 if let Some(project) = projects.get(&rejoined_project.id) {
478 project.update(cx, |project, cx| {
479 project.rejoined(rejoined_project, message_id, cx).log_err();
480 });
481 }
482 }
483
484 anyhow::Ok(())
485 })
486 })
487 }
488
489 pub fn id(&self) -> u64 {
490 self.id
491 }
492
493 pub fn status(&self) -> RoomStatus {
494 self.status
495 }
496
497 pub fn local_participant(&self) -> &LocalParticipant {
498 &self.local_participant
499 }
500
501 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
502 &self.remote_participants
503 }
504
505 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
506 self.remote_participants
507 .values()
508 .find(|p| p.peer_id == peer_id)
509 }
510
511 pub fn pending_participants(&self) -> &[Arc<User>] {
512 &self.pending_participants
513 }
514
515 pub fn contains_participant(&self, user_id: u64) -> bool {
516 self.participant_user_ids.contains(&user_id)
517 }
518
519 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
520 self.follows_by_leader_id_project_id
521 .get(&(leader_id, project_id))
522 .map_or(&[], |v| v.as_slice())
523 }
524
525 async fn handle_room_updated(
526 this: ModelHandle<Self>,
527 envelope: TypedEnvelope<proto::RoomUpdated>,
528 _: Arc<Client>,
529 mut cx: AsyncAppContext,
530 ) -> Result<()> {
531 let room = envelope
532 .payload
533 .room
534 .ok_or_else(|| anyhow!("invalid room"))?;
535 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
536 }
537
538 fn apply_room_update(
539 &mut self,
540 mut room: proto::Room,
541 cx: &mut ModelContext<Self>,
542 ) -> Result<()> {
543 // Filter ourselves out from the room's participants.
544 let local_participant_ix = room
545 .participants
546 .iter()
547 .position(|participant| Some(participant.user_id) == self.client.user_id());
548 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
549
550 let pending_participant_user_ids = room
551 .pending_participants
552 .iter()
553 .map(|p| p.user_id)
554 .collect::<Vec<_>>();
555
556 let remote_participant_user_ids = room
557 .participants
558 .iter()
559 .map(|p| p.user_id)
560 .collect::<Vec<_>>();
561
562 let (remote_participants, pending_participants) =
563 self.user_store.update(cx, move |user_store, cx| {
564 (
565 user_store.get_users(remote_participant_user_ids, cx),
566 user_store.get_users(pending_participant_user_ids, cx),
567 )
568 });
569
570 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
571 let (remote_participants, pending_participants) =
572 futures::join!(remote_participants, pending_participants);
573
574 this.update(&mut cx, |this, cx| {
575 this.participant_user_ids.clear();
576
577 if let Some(participant) = local_participant {
578 this.local_participant.projects = participant.projects;
579 } else {
580 this.local_participant.projects.clear();
581 }
582
583 if let Some(participants) = remote_participants.log_err() {
584 for (participant, user) in room.participants.into_iter().zip(participants) {
585 let Some(peer_id) = participant.peer_id else { continue };
586 this.participant_user_ids.insert(participant.user_id);
587
588 let old_projects = this
589 .remote_participants
590 .get(&participant.user_id)
591 .into_iter()
592 .flat_map(|existing| &existing.projects)
593 .map(|project| project.id)
594 .collect::<HashSet<_>>();
595 let new_projects = participant
596 .projects
597 .iter()
598 .map(|project| project.id)
599 .collect::<HashSet<_>>();
600
601 for project in &participant.projects {
602 if !old_projects.contains(&project.id) {
603 cx.emit(Event::RemoteProjectShared {
604 owner: user.clone(),
605 project_id: project.id,
606 worktree_root_names: project.worktree_root_names.clone(),
607 });
608 }
609 }
610
611 for unshared_project_id in old_projects.difference(&new_projects) {
612 this.joined_projects.retain(|project| {
613 if let Some(project) = project.upgrade(cx) {
614 project.update(cx, |project, cx| {
615 if project.remote_id() == Some(*unshared_project_id) {
616 project.disconnected_from_host(cx);
617 false
618 } else {
619 true
620 }
621 })
622 } else {
623 false
624 }
625 });
626 cx.emit(Event::RemoteProjectUnshared {
627 project_id: *unshared_project_id,
628 });
629 }
630
631 let location = ParticipantLocation::from_proto(participant.location)
632 .unwrap_or(ParticipantLocation::External);
633 if let Some(remote_participant) =
634 this.remote_participants.get_mut(&participant.user_id)
635 {
636 remote_participant.projects = participant.projects;
637 remote_participant.peer_id = peer_id;
638 if location != remote_participant.location {
639 remote_participant.location = location;
640 cx.emit(Event::ParticipantLocationChanged {
641 participant_id: peer_id,
642 });
643 }
644 } else {
645 this.remote_participants.insert(
646 participant.user_id,
647 RemoteParticipant {
648 user: user.clone(),
649 peer_id,
650 projects: participant.projects,
651 location,
652 muted: false,
653 speaking: false,
654 video_tracks: Default::default(),
655 audio_tracks: Default::default(),
656 },
657 );
658
659 if let Some(live_kit) = this.live_kit.as_ref() {
660 let video_tracks =
661 live_kit.room.remote_video_tracks(&user.id.to_string());
662 let audio_tracks =
663 live_kit.room.remote_audio_tracks(&user.id.to_string());
664 for track in video_tracks {
665 this.remote_video_track_updated(
666 RemoteVideoTrackUpdate::Subscribed(track),
667 cx,
668 )
669 .log_err();
670 }
671 for track in audio_tracks {
672 this.remote_audio_track_updated(
673 RemoteAudioTrackUpdate::Subscribed(track),
674 cx,
675 )
676 .log_err();
677 }
678 }
679 }
680 }
681
682 this.remote_participants.retain(|user_id, participant| {
683 if this.participant_user_ids.contains(user_id) {
684 true
685 } else {
686 for project in &participant.projects {
687 cx.emit(Event::RemoteProjectUnshared {
688 project_id: project.id,
689 });
690 }
691 false
692 }
693 });
694 }
695
696 if let Some(pending_participants) = pending_participants.log_err() {
697 this.pending_participants = pending_participants;
698 for participant in &this.pending_participants {
699 this.participant_user_ids.insert(participant.id);
700 }
701 }
702
703 this.follows_by_leader_id_project_id.clear();
704 for follower in room.followers {
705 let project_id = follower.project_id;
706 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
707 (Some(leader), Some(follower)) => (leader, follower),
708
709 _ => {
710 log::error!("Follower message {follower:?} missing some state");
711 continue;
712 }
713 };
714
715 let list = this
716 .follows_by_leader_id_project_id
717 .entry((leader, project_id))
718 .or_insert(Vec::new());
719 if !list.contains(&follower) {
720 list.push(follower);
721 }
722 }
723
724 this.pending_room_update.take();
725 if this.should_leave() {
726 log::info!("room is empty, leaving");
727 let _ = this.leave(cx);
728 }
729
730 this.check_invariants();
731 cx.notify();
732 });
733 }));
734
735 cx.notify();
736 Ok(())
737 }
738
739 fn remote_video_track_updated(
740 &mut self,
741 change: RemoteVideoTrackUpdate,
742 cx: &mut ModelContext<Self>,
743 ) -> Result<()> {
744 match change {
745 RemoteVideoTrackUpdate::Subscribed(track) => {
746 let user_id = track.publisher_id().parse()?;
747 let track_id = track.sid().to_string();
748 let participant = self
749 .remote_participants
750 .get_mut(&user_id)
751 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
752 participant.video_tracks.insert(
753 track_id.clone(),
754 Arc::new(RemoteVideoTrack {
755 live_kit_track: track,
756 }),
757 );
758 cx.emit(Event::RemoteVideoTracksChanged {
759 participant_id: participant.peer_id,
760 });
761 }
762 RemoteVideoTrackUpdate::Unsubscribed {
763 publisher_id,
764 track_id,
765 } => {
766 let user_id = publisher_id.parse()?;
767 let participant = self
768 .remote_participants
769 .get_mut(&user_id)
770 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
771 participant.video_tracks.remove(&track_id);
772 cx.emit(Event::RemoteVideoTracksChanged {
773 participant_id: participant.peer_id,
774 });
775 }
776 }
777
778 cx.notify();
779 Ok(())
780 }
781
782 fn remote_audio_track_updated(
783 &mut self,
784 change: RemoteAudioTrackUpdate,
785 cx: &mut ModelContext<Self>,
786 ) -> Result<()> {
787 match change {
788 RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
789 let mut speaker_ids = speakers
790 .into_iter()
791 .filter_map(|speaker_sid| speaker_sid.parse().ok())
792 .collect::<Vec<u64>>();
793 speaker_ids.sort_unstable();
794 for (sid, participant) in &mut self.remote_participants {
795 if let Ok(_) = speaker_ids.binary_search(sid) {
796 participant.speaking = true;
797 } else {
798 participant.speaking = false;
799 }
800 }
801 if let Some(id) = self.client.user_id() {
802 if let Some(room) = &mut self.live_kit {
803 if let Ok(_) = speaker_ids.binary_search(&id) {
804 room.speaking = true;
805 } else {
806 room.speaking = false;
807 }
808 }
809 }
810 cx.notify();
811 }
812 RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
813 for participant in &mut self.remote_participants.values_mut() {
814 let mut found = false;
815 for track in participant.audio_tracks.values() {
816 if track.sid() == track_id {
817 found = true;
818 break;
819 }
820 }
821 if found {
822 participant.muted = muted;
823 break;
824 }
825 }
826 cx.notify();
827 }
828 RemoteAudioTrackUpdate::Subscribed(track) => {
829 let user_id = track.publisher_id().parse()?;
830 let track_id = track.sid().to_string();
831 let participant = self
832 .remote_participants
833 .get_mut(&user_id)
834 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
835 participant.audio_tracks.insert(track_id.clone(), track);
836 cx.emit(Event::RemoteAudioTracksChanged {
837 participant_id: participant.peer_id,
838 });
839 }
840 RemoteAudioTrackUpdate::Unsubscribed {
841 publisher_id,
842 track_id,
843 } => {
844 let user_id = publisher_id.parse()?;
845 let participant = self
846 .remote_participants
847 .get_mut(&user_id)
848 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
849 participant.audio_tracks.remove(&track_id);
850 cx.emit(Event::RemoteAudioTracksChanged {
851 participant_id: participant.peer_id,
852 });
853 }
854 }
855
856 cx.notify();
857 Ok(())
858 }
859
860 fn check_invariants(&self) {
861 #[cfg(any(test, feature = "test-support"))]
862 {
863 for participant in self.remote_participants.values() {
864 assert!(self.participant_user_ids.contains(&participant.user.id));
865 assert_ne!(participant.user.id, self.client.user_id().unwrap());
866 }
867
868 for participant in &self.pending_participants {
869 assert!(self.participant_user_ids.contains(&participant.id));
870 assert_ne!(participant.id, self.client.user_id().unwrap());
871 }
872
873 assert_eq!(
874 self.participant_user_ids.len(),
875 self.remote_participants.len() + self.pending_participants.len()
876 );
877 }
878 }
879
880 pub(crate) fn call(
881 &mut self,
882 called_user_id: u64,
883 initial_project_id: Option<u64>,
884 cx: &mut ModelContext<Self>,
885 ) -> Task<Result<()>> {
886 if self.status.is_offline() {
887 return Task::ready(Err(anyhow!("room is offline")));
888 }
889
890 cx.notify();
891 let client = self.client.clone();
892 let room_id = self.id;
893 self.pending_call_count += 1;
894 cx.spawn(|this, mut cx| async move {
895 let result = client
896 .request(proto::Call {
897 room_id,
898 called_user_id,
899 initial_project_id,
900 })
901 .await;
902 this.update(&mut cx, |this, cx| {
903 this.pending_call_count -= 1;
904 if this.should_leave() {
905 this.leave(cx).detach_and_log_err(cx);
906 }
907 });
908 result?;
909 Ok(())
910 })
911 }
912
913 pub fn join_project(
914 &mut self,
915 id: u64,
916 language_registry: Arc<LanguageRegistry>,
917 fs: Arc<dyn Fs>,
918 cx: &mut ModelContext<Self>,
919 ) -> Task<Result<ModelHandle<Project>>> {
920 let client = self.client.clone();
921 let user_store = self.user_store.clone();
922 cx.spawn(|this, mut cx| async move {
923 let project =
924 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
925 this.update(&mut cx, |this, cx| {
926 this.joined_projects.retain(|project| {
927 if let Some(project) = project.upgrade(cx) {
928 !project.read(cx).is_read_only()
929 } else {
930 false
931 }
932 });
933 this.joined_projects.insert(project.downgrade());
934 });
935 Ok(project)
936 })
937 }
938
939 pub(crate) fn share_project(
940 &mut self,
941 project: ModelHandle<Project>,
942 cx: &mut ModelContext<Self>,
943 ) -> Task<Result<u64>> {
944 if let Some(project_id) = project.read(cx).remote_id() {
945 return Task::ready(Ok(project_id));
946 }
947
948 let request = self.client.request(proto::ShareProject {
949 room_id: self.id(),
950 worktrees: project.read(cx).worktree_metadata_protos(cx),
951 });
952 cx.spawn(|this, mut cx| async move {
953 let response = request.await?;
954
955 project.update(&mut cx, |project, cx| {
956 project.shared(response.project_id, cx)
957 })?;
958
959 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
960 this.update(&mut cx, |this, cx| {
961 this.shared_projects.insert(project.downgrade());
962 let active_project = this.local_participant.active_project.as_ref();
963 if active_project.map_or(false, |location| *location == project) {
964 this.set_location(Some(&project), cx)
965 } else {
966 Task::ready(Ok(()))
967 }
968 })
969 .await?;
970
971 Ok(response.project_id)
972 })
973 }
974
975 pub(crate) fn unshare_project(
976 &mut self,
977 project: ModelHandle<Project>,
978 cx: &mut ModelContext<Self>,
979 ) -> Result<()> {
980 let project_id = match project.read(cx).remote_id() {
981 Some(project_id) => project_id,
982 None => return Ok(()),
983 };
984
985 self.client.send(proto::UnshareProject { project_id })?;
986 project.update(cx, |this, cx| this.unshare(cx))
987 }
988
989 pub(crate) fn set_location(
990 &mut self,
991 project: Option<&ModelHandle<Project>>,
992 cx: &mut ModelContext<Self>,
993 ) -> Task<Result<()>> {
994 if self.status.is_offline() {
995 return Task::ready(Err(anyhow!("room is offline")));
996 }
997
998 let client = self.client.clone();
999 let room_id = self.id;
1000 let location = if let Some(project) = project {
1001 self.local_participant.active_project = Some(project.downgrade());
1002 if let Some(project_id) = project.read(cx).remote_id() {
1003 proto::participant_location::Variant::SharedProject(
1004 proto::participant_location::SharedProject { id: project_id },
1005 )
1006 } else {
1007 proto::participant_location::Variant::UnsharedProject(
1008 proto::participant_location::UnsharedProject {},
1009 )
1010 }
1011 } else {
1012 self.local_participant.active_project = None;
1013 proto::participant_location::Variant::External(proto::participant_location::External {})
1014 };
1015
1016 cx.notify();
1017 cx.foreground().spawn(async move {
1018 client
1019 .request(proto::UpdateParticipantLocation {
1020 room_id,
1021 location: Some(proto::ParticipantLocation {
1022 variant: Some(location),
1023 }),
1024 })
1025 .await?;
1026 Ok(())
1027 })
1028 }
1029
1030 pub fn is_screen_sharing(&self) -> bool {
1031 self.live_kit.as_ref().map_or(false, |live_kit| {
1032 !matches!(live_kit.screen_track, LocalTrack::None)
1033 })
1034 }
1035
1036 pub fn is_sharing_mic(&self) -> bool {
1037 self.live_kit.as_ref().map_or(false, |live_kit| {
1038 !matches!(live_kit.microphone_track, LocalTrack::None)
1039 })
1040 }
1041
1042 pub fn is_muted(&self) -> bool {
1043 self.live_kit
1044 .as_ref()
1045 .and_then(|live_kit| match &live_kit.microphone_track {
1046 LocalTrack::None => None,
1047 LocalTrack::Pending { muted, .. } => Some(*muted),
1048 LocalTrack::Published { muted, .. } => Some(*muted),
1049 })
1050 .unwrap_or(false)
1051 }
1052
1053 pub fn is_speaking(&self) -> bool {
1054 self.live_kit
1055 .as_ref()
1056 .map_or(false, |live_kit| live_kit.speaking)
1057 }
1058
1059 pub fn is_deafened(&self) -> Option<bool> {
1060 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1061 }
1062
1063 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1064 if self.status.is_offline() {
1065 return Task::ready(Err(anyhow!("room is offline")));
1066 } else if self.is_sharing_mic() {
1067 return Task::ready(Err(anyhow!("microphone was already shared")));
1068 }
1069
1070 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1071 let publish_id = post_inc(&mut live_kit.next_publish_id);
1072 live_kit.microphone_track = LocalTrack::Pending {
1073 publish_id,
1074 muted: false,
1075 };
1076 cx.notify();
1077 publish_id
1078 } else {
1079 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1080 };
1081
1082 cx.spawn_weak(|this, mut cx| async move {
1083 let publish_track = async {
1084 let track = LocalAudioTrack::create();
1085 this.upgrade(&cx)
1086 .ok_or_else(|| anyhow!("room was dropped"))?
1087 .read_with(&cx, |this, _| {
1088 this.live_kit
1089 .as_ref()
1090 .map(|live_kit| live_kit.room.publish_audio_track(&track))
1091 })
1092 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1093 .await
1094 };
1095
1096 let publication = publish_track.await;
1097 this.upgrade(&cx)
1098 .ok_or_else(|| anyhow!("room was dropped"))?
1099 .update(&mut cx, |this, cx| {
1100 let live_kit = this
1101 .live_kit
1102 .as_mut()
1103 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1104
1105 let (canceled, muted) = if let LocalTrack::Pending {
1106 publish_id: cur_publish_id,
1107 muted,
1108 } = &live_kit.microphone_track
1109 {
1110 (*cur_publish_id != publish_id, *muted)
1111 } else {
1112 (true, false)
1113 };
1114
1115 match publication {
1116 Ok(publication) => {
1117 if canceled {
1118 live_kit.room.unpublish_track(publication);
1119 } else {
1120 if muted {
1121 cx.background().spawn(publication.set_mute(muted)).detach();
1122 }
1123 live_kit.microphone_track = LocalTrack::Published {
1124 track_publication: publication,
1125 muted,
1126 };
1127 cx.notify();
1128 }
1129 Ok(())
1130 }
1131 Err(error) => {
1132 if canceled {
1133 Ok(())
1134 } else {
1135 live_kit.microphone_track = LocalTrack::None;
1136 cx.notify();
1137 Err(error)
1138 }
1139 }
1140 }
1141 })
1142 })
1143 }
1144
1145 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1146 if self.status.is_offline() {
1147 return Task::ready(Err(anyhow!("room is offline")));
1148 } else if self.is_screen_sharing() {
1149 return Task::ready(Err(anyhow!("screen was already shared")));
1150 }
1151
1152 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1153 let publish_id = post_inc(&mut live_kit.next_publish_id);
1154 live_kit.screen_track = LocalTrack::Pending {
1155 publish_id,
1156 muted: false,
1157 };
1158 cx.notify();
1159 (live_kit.room.display_sources(), publish_id)
1160 } else {
1161 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1162 };
1163
1164 cx.spawn_weak(|this, mut cx| async move {
1165 let publish_track = async {
1166 let displays = displays.await?;
1167 let display = displays
1168 .first()
1169 .ok_or_else(|| anyhow!("no display found"))?;
1170 let track = LocalVideoTrack::screen_share_for_display(&display);
1171 this.upgrade(&cx)
1172 .ok_or_else(|| anyhow!("room was dropped"))?
1173 .read_with(&cx, |this, _| {
1174 this.live_kit
1175 .as_ref()
1176 .map(|live_kit| live_kit.room.publish_video_track(&track))
1177 })
1178 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1179 .await
1180 };
1181
1182 let publication = publish_track.await;
1183 this.upgrade(&cx)
1184 .ok_or_else(|| anyhow!("room was dropped"))?
1185 .update(&mut cx, |this, cx| {
1186 let live_kit = this
1187 .live_kit
1188 .as_mut()
1189 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1190
1191 let (canceled, muted) = if let LocalTrack::Pending {
1192 publish_id: cur_publish_id,
1193 muted,
1194 } = &live_kit.screen_track
1195 {
1196 (*cur_publish_id != publish_id, *muted)
1197 } else {
1198 (true, false)
1199 };
1200
1201 match publication {
1202 Ok(publication) => {
1203 if canceled {
1204 live_kit.room.unpublish_track(publication);
1205 } else {
1206 if muted {
1207 cx.background().spawn(publication.set_mute(muted)).detach();
1208 }
1209 live_kit.screen_track = LocalTrack::Published {
1210 track_publication: publication,
1211 muted,
1212 };
1213 cx.notify();
1214 }
1215 Ok(())
1216 }
1217 Err(error) => {
1218 if canceled {
1219 Ok(())
1220 } else {
1221 live_kit.screen_track = LocalTrack::None;
1222 cx.notify();
1223 Err(error)
1224 }
1225 }
1226 }
1227 })
1228 })
1229 }
1230 fn set_mute(
1231 live_kit: &mut LiveKitRoom,
1232 should_mute: bool,
1233 cx: &mut ModelContext<Self>,
1234 ) -> Result<Task<Result<()>>> {
1235 if !should_mute {
1236 // clear user muting state.
1237 live_kit.muted_by_user = false;
1238 }
1239 match &mut live_kit.microphone_track {
1240 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1241 LocalTrack::Pending { muted, .. } => {
1242 *muted = should_mute;
1243 cx.notify();
1244 Ok(Task::Ready(Some(Ok(()))))
1245 }
1246 LocalTrack::Published {
1247 track_publication,
1248 muted,
1249 } => {
1250 *muted = should_mute;
1251 cx.notify();
1252 Ok(cx.background().spawn(track_publication.set_mute(*muted)))
1253 }
1254 }
1255 }
1256 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1257 let should_mute = !self.is_muted();
1258 if let Some(live_kit) = self.live_kit.as_mut() {
1259 let ret = Self::set_mute(live_kit, should_mute, cx);
1260 live_kit.muted_by_user = should_mute;
1261 ret
1262 } else {
1263 Err(anyhow!("LiveKit not started"))
1264 }
1265 }
1266
1267 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1268 if let Some(live_kit) = self.live_kit.as_mut() {
1269 (*live_kit).deafened = !live_kit.deafened;
1270
1271 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1272 // Context notification is sent within set_mute itself.
1273 let mut mute_task = None;
1274 // When deafening, mute user's mic as well.
1275 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1276 if live_kit.deafened || !live_kit.muted_by_user {
1277 mute_task = Some(Self::set_mute(live_kit, live_kit.deafened, cx)?);
1278 };
1279 for participant in self.remote_participants.values() {
1280 for track in live_kit
1281 .room
1282 .remote_audio_track_publications(&participant.user.id.to_string())
1283 {
1284 tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1285 }
1286 }
1287
1288 Ok(cx.foreground().spawn(async move {
1289 if let Some(mute_task) = mute_task {
1290 mute_task.await?;
1291 }
1292 for task in tasks {
1293 task.await?;
1294 }
1295 Ok(())
1296 }))
1297 } else {
1298 Err(anyhow!("LiveKit not started"))
1299 }
1300 }
1301
1302 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1303 if self.status.is_offline() {
1304 return Err(anyhow!("room is offline"));
1305 }
1306
1307 let live_kit = self
1308 .live_kit
1309 .as_mut()
1310 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1311 match mem::take(&mut live_kit.screen_track) {
1312 LocalTrack::None => Err(anyhow!("screen was not shared")),
1313 LocalTrack::Pending { .. } => {
1314 cx.notify();
1315 Ok(())
1316 }
1317 LocalTrack::Published {
1318 track_publication, ..
1319 } => {
1320 live_kit.room.unpublish_track(track_publication);
1321 cx.notify();
1322 Ok(())
1323 }
1324 }
1325 }
1326
1327 #[cfg(any(test, feature = "test-support"))]
1328 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1329 self.live_kit
1330 .as_ref()
1331 .unwrap()
1332 .room
1333 .set_display_sources(sources);
1334 }
1335}
1336
1337struct LiveKitRoom {
1338 room: Arc<live_kit_client::Room>,
1339 screen_track: LocalTrack,
1340 microphone_track: LocalTrack,
1341 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1342 muted_by_user: bool,
1343 deafened: bool,
1344 speaking: bool,
1345 next_publish_id: usize,
1346 _maintain_room: Task<()>,
1347 _maintain_tracks: [Task<()>; 2],
1348}
1349
1350enum LocalTrack {
1351 None,
1352 Pending {
1353 publish_id: usize,
1354 muted: bool,
1355 },
1356 Published {
1357 track_publication: LocalTrackPublication,
1358 muted: bool,
1359 },
1360}
1361
1362impl Default for LocalTrack {
1363 fn default() -> Self {
1364 Self::None
1365 }
1366}
1367
1368#[derive(Copy, Clone, PartialEq, Eq)]
1369pub enum RoomStatus {
1370 Online,
1371 Rejoining,
1372 Offline,
1373}
1374
1375impl RoomStatus {
1376 pub fn is_offline(&self) -> bool {
1377 matches!(self, RoomStatus::Offline)
1378 }
1379
1380 pub fn is_online(&self) -> bool {
1381 matches!(self, RoomStatus::Online)
1382 }
1383}