1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{
7 proto::{self, PeerId},
8 Client, TypedEnvelope, User, UserStore,
9};
10use collections::{BTreeMap, HashMap, HashSet};
11use fs::Fs;
12use futures::{FutureExt, StreamExt};
13use gpui::{
14 AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
15};
16use language::LanguageRegistry;
17use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
18use postage::stream::Stream;
19use project::Project;
20use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
21use util::{post_inc, ResultExt, TryFutureExt};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub enum Event {
27 ParticipantLocationChanged {
28 participant_id: proto::PeerId,
29 },
30 RemoteVideoTracksChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteProjectShared {
34 owner: Arc<User>,
35 project_id: u64,
36 worktree_root_names: Vec<String>,
37 },
38 RemoteProjectUnshared {
39 project_id: u64,
40 },
41 Left,
42}
43
44pub struct Room {
45 id: u64,
46 live_kit: Option<LiveKitRoom>,
47 status: RoomStatus,
48 shared_projects: HashSet<WeakModelHandle<Project>>,
49 joined_projects: HashSet<WeakModelHandle<Project>>,
50 local_participant: LocalParticipant,
51 remote_participants: BTreeMap<u64, RemoteParticipant>,
52 pending_participants: Vec<Arc<User>>,
53 participant_user_ids: HashSet<u64>,
54 pending_call_count: usize,
55 leave_when_empty: bool,
56 client: Arc<Client>,
57 user_store: ModelHandle<UserStore>,
58 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
59 subscriptions: Vec<client::Subscription>,
60 pending_room_update: Option<Task<()>>,
61 maintain_connection: Option<Task<Option<()>>>,
62}
63
64impl Entity for Room {
65 type Event = Event;
66
67 fn release(&mut self, cx: &mut MutableAppContext) {
68 if self.status.is_online() {
69 self.leave_internal(cx).detach_and_log_err(cx);
70 }
71 }
72
73 fn app_will_quit(
74 &mut self,
75 cx: &mut MutableAppContext,
76 ) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
77 if self.status.is_online() {
78 let leave = self.leave_internal(cx);
79 Some(
80 cx.background()
81 .spawn(async move {
82 leave.await.log_err();
83 })
84 .boxed(),
85 )
86 } else {
87 None
88 }
89 }
90}
91
92impl Room {
93 fn new(
94 id: u64,
95 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
96 client: Arc<Client>,
97 user_store: ModelHandle<UserStore>,
98 cx: &mut ModelContext<Self>,
99 ) -> Self {
100 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
101 let room = live_kit_client::Room::new();
102 let mut status = room.status();
103 // Consume the initial status of the room.
104 let _ = status.try_recv();
105 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
106 while let Some(status) = status.next().await {
107 let this = if let Some(this) = this.upgrade(&cx) {
108 this
109 } else {
110 break;
111 };
112
113 if status == live_kit_client::ConnectionState::Disconnected {
114 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
115 break;
116 }
117 }
118 });
119
120 let mut track_changes = room.remote_video_track_updates();
121 let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
122 while let Some(track_change) = track_changes.next().await {
123 let this = if let Some(this) = this.upgrade(&cx) {
124 this
125 } else {
126 break;
127 };
128
129 this.update(&mut cx, |this, cx| {
130 this.remote_video_track_updated(track_change, cx).log_err()
131 });
132 }
133 });
134
135 cx.foreground()
136 .spawn(room.connect(&connection_info.server_url, &connection_info.token))
137 .detach_and_log_err(cx);
138
139 Some(LiveKitRoom {
140 room,
141 screen_track: ScreenTrack::None,
142 next_publish_id: 0,
143 _maintain_room,
144 _maintain_tracks,
145 })
146 } else {
147 None
148 };
149
150 let maintain_connection =
151 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
152
153 Self {
154 id,
155 live_kit: live_kit_room,
156 status: RoomStatus::Online,
157 shared_projects: Default::default(),
158 joined_projects: Default::default(),
159 participant_user_ids: Default::default(),
160 local_participant: Default::default(),
161 remote_participants: Default::default(),
162 pending_participants: Default::default(),
163 pending_call_count: 0,
164 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
165 leave_when_empty: false,
166 pending_room_update: None,
167 client,
168 user_store,
169 follows_by_leader_id_project_id: Default::default(),
170 maintain_connection: Some(maintain_connection),
171 }
172 }
173
174 pub(crate) fn create(
175 called_user_id: u64,
176 initial_project: Option<ModelHandle<Project>>,
177 client: Arc<Client>,
178 user_store: ModelHandle<UserStore>,
179 cx: &mut MutableAppContext,
180 ) -> Task<Result<ModelHandle<Self>>> {
181 cx.spawn(|mut cx| async move {
182 let response = client.request(proto::CreateRoom {}).await?;
183 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
184 let room = cx.add_model(|cx| {
185 Self::new(
186 room_proto.id,
187 response.live_kit_connection_info,
188 client,
189 user_store,
190 cx,
191 )
192 });
193
194 let initial_project_id = if let Some(initial_project) = initial_project {
195 let initial_project_id = room
196 .update(&mut cx, |room, cx| {
197 room.share_project(initial_project.clone(), cx)
198 })
199 .await?;
200 Some(initial_project_id)
201 } else {
202 None
203 };
204
205 match room
206 .update(&mut cx, |room, cx| {
207 room.leave_when_empty = true;
208 room.call(called_user_id, initial_project_id, cx)
209 })
210 .await
211 {
212 Ok(()) => Ok(room),
213 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
214 }
215 })
216 }
217
218 pub(crate) fn join(
219 call: &IncomingCall,
220 client: Arc<Client>,
221 user_store: ModelHandle<UserStore>,
222 cx: &mut MutableAppContext,
223 ) -> Task<Result<ModelHandle<Self>>> {
224 let room_id = call.room_id;
225 cx.spawn(|mut cx| async move {
226 let response = client.request(proto::JoinRoom { id: room_id }).await?;
227 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
228 let room = cx.add_model(|cx| {
229 Self::new(
230 room_id,
231 response.live_kit_connection_info,
232 client,
233 user_store,
234 cx,
235 )
236 });
237 room.update(&mut cx, |room, cx| {
238 room.leave_when_empty = true;
239 room.apply_room_update(room_proto, cx)?;
240 anyhow::Ok(())
241 })?;
242 Ok(room)
243 })
244 }
245
246 fn should_leave(&self) -> bool {
247 self.leave_when_empty
248 && self.pending_room_update.is_none()
249 && self.pending_participants.is_empty()
250 && self.remote_participants.is_empty()
251 && self.pending_call_count == 0
252 }
253
254 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
255 cx.notify();
256 cx.emit(Event::Left);
257 self.leave_internal(cx)
258 }
259
260 fn leave_internal(&mut self, cx: &mut MutableAppContext) -> Task<Result<()>> {
261 if self.status.is_offline() {
262 return Task::ready(Err(anyhow!("room is offline")));
263 }
264
265 log::info!("leaving room");
266
267 for project in self.shared_projects.drain() {
268 if let Some(project) = project.upgrade(cx) {
269 project.update(cx, |project, cx| {
270 project.unshare(cx).log_err();
271 });
272 }
273 }
274 for project in self.joined_projects.drain() {
275 if let Some(project) = project.upgrade(cx) {
276 project.update(cx, |project, cx| {
277 project.disconnected_from_host(cx);
278 project.close(cx);
279 });
280 }
281 }
282
283 self.status = RoomStatus::Offline;
284 self.remote_participants.clear();
285 self.pending_participants.clear();
286 self.participant_user_ids.clear();
287 self.subscriptions.clear();
288 self.live_kit.take();
289 self.pending_room_update.take();
290 self.maintain_connection.take();
291
292 let leave_room = self.client.request(proto::LeaveRoom {});
293 cx.background().spawn(async move {
294 leave_room.await?;
295 anyhow::Ok(())
296 })
297 }
298
299 async fn maintain_connection(
300 this: WeakModelHandle<Self>,
301 client: Arc<Client>,
302 mut cx: AsyncAppContext,
303 ) -> Result<()> {
304 let mut client_status = client.status();
305 loop {
306 let _ = client_status.try_recv();
307 let is_connected = client_status.borrow().is_connected();
308 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
309 if !is_connected || client_status.next().await.is_some() {
310 log::info!("detected client disconnection");
311
312 this.upgrade(&cx)
313 .ok_or_else(|| anyhow!("room was dropped"))?
314 .update(&mut cx, |this, cx| {
315 this.status = RoomStatus::Rejoining;
316 cx.notify();
317 });
318
319 // Wait for client to re-establish a connection to the server.
320 {
321 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
322 let client_reconnection = async {
323 let mut remaining_attempts = 3;
324 while remaining_attempts > 0 {
325 if client_status.borrow().is_connected() {
326 log::info!("client reconnected, attempting to rejoin room");
327
328 let Some(this) = this.upgrade(&cx) else { break };
329 if this
330 .update(&mut cx, |this, cx| this.rejoin(cx))
331 .await
332 .log_err()
333 .is_some()
334 {
335 return true;
336 } else {
337 remaining_attempts -= 1;
338 }
339 } else if client_status.borrow().is_signed_out() {
340 return false;
341 }
342
343 log::info!(
344 "waiting for client status change, remaining attempts {}",
345 remaining_attempts
346 );
347 client_status.next().await;
348 }
349 false
350 }
351 .fuse();
352 futures::pin_mut!(client_reconnection);
353
354 futures::select_biased! {
355 reconnected = client_reconnection => {
356 if reconnected {
357 log::info!("successfully reconnected to room");
358 // If we successfully joined the room, go back around the loop
359 // waiting for future connection status changes.
360 continue;
361 }
362 }
363 _ = reconnection_timeout => {
364 log::info!("room reconnection timeout expired");
365 }
366 }
367 }
368
369 break;
370 }
371 }
372
373 // The client failed to re-establish a connection to the server
374 // or an error occurred while trying to re-join the room. Either way
375 // we leave the room and return an error.
376 if let Some(this) = this.upgrade(&cx) {
377 log::info!("reconnection failed, leaving room");
378 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
379 }
380 Err(anyhow!(
381 "can't reconnect to room: client failed to re-establish connection"
382 ))
383 }
384
385 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
386 let mut projects = HashMap::default();
387 let mut reshared_projects = Vec::new();
388 let mut rejoined_projects = Vec::new();
389 self.shared_projects.retain(|project| {
390 if let Some(handle) = project.upgrade(cx) {
391 let project = handle.read(cx);
392 if let Some(project_id) = project.remote_id() {
393 projects.insert(project_id, handle.clone());
394 reshared_projects.push(proto::UpdateProject {
395 project_id,
396 worktrees: project.worktree_metadata_protos(cx),
397 });
398 return true;
399 }
400 }
401 false
402 });
403 self.joined_projects.retain(|project| {
404 if let Some(handle) = project.upgrade(cx) {
405 let project = handle.read(cx);
406 if let Some(project_id) = project.remote_id() {
407 projects.insert(project_id, handle.clone());
408 rejoined_projects.push(proto::RejoinProject {
409 id: project_id,
410 worktrees: project
411 .worktrees(cx)
412 .map(|worktree| {
413 let worktree = worktree.read(cx);
414 proto::RejoinWorktree {
415 id: worktree.id().to_proto(),
416 scan_id: worktree.completed_scan_id() as u64,
417 }
418 })
419 .collect(),
420 });
421 }
422 return true;
423 }
424 false
425 });
426
427 let response = self.client.request(proto::RejoinRoom {
428 id: self.id,
429 reshared_projects,
430 rejoined_projects,
431 });
432
433 cx.spawn(|this, mut cx| async move {
434 let response = response.await?;
435 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
436 this.update(&mut cx, |this, cx| {
437 this.status = RoomStatus::Online;
438 this.apply_room_update(room_proto, cx)?;
439
440 for reshared_project in response.reshared_projects {
441 if let Some(project) = projects.get(&reshared_project.id) {
442 project.update(cx, |project, cx| {
443 project.reshared(reshared_project, cx).log_err();
444 });
445 }
446 }
447
448 for rejoined_project in response.rejoined_projects {
449 if let Some(project) = projects.get(&rejoined_project.id) {
450 project.update(cx, |project, cx| {
451 project.rejoined(rejoined_project, cx).log_err();
452 });
453 }
454 }
455
456 anyhow::Ok(())
457 })
458 })
459 }
460
461 pub fn id(&self) -> u64 {
462 self.id
463 }
464
465 pub fn status(&self) -> RoomStatus {
466 self.status
467 }
468
469 pub fn local_participant(&self) -> &LocalParticipant {
470 &self.local_participant
471 }
472
473 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
474 &self.remote_participants
475 }
476
477 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
478 self.remote_participants
479 .values()
480 .find(|p| p.peer_id == peer_id)
481 }
482
483 pub fn pending_participants(&self) -> &[Arc<User>] {
484 &self.pending_participants
485 }
486
487 pub fn contains_participant(&self, user_id: u64) -> bool {
488 self.participant_user_ids.contains(&user_id)
489 }
490
491 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
492 self.follows_by_leader_id_project_id
493 .get(&(leader_id, project_id))
494 .map_or(&[], |v| v.as_slice())
495 }
496
497 async fn handle_room_updated(
498 this: ModelHandle<Self>,
499 envelope: TypedEnvelope<proto::RoomUpdated>,
500 _: Arc<Client>,
501 mut cx: AsyncAppContext,
502 ) -> Result<()> {
503 let room = envelope
504 .payload
505 .room
506 .ok_or_else(|| anyhow!("invalid room"))?;
507 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
508 }
509
510 fn apply_room_update(
511 &mut self,
512 mut room: proto::Room,
513 cx: &mut ModelContext<Self>,
514 ) -> Result<()> {
515 // Filter ourselves out from the room's participants.
516 let local_participant_ix = room
517 .participants
518 .iter()
519 .position(|participant| Some(participant.user_id) == self.client.user_id());
520 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
521
522 let pending_participant_user_ids = room
523 .pending_participants
524 .iter()
525 .map(|p| p.user_id)
526 .collect::<Vec<_>>();
527
528 let remote_participant_user_ids = room
529 .participants
530 .iter()
531 .map(|p| p.user_id)
532 .collect::<Vec<_>>();
533
534 let (remote_participants, pending_participants) =
535 self.user_store.update(cx, move |user_store, cx| {
536 (
537 user_store.get_users(remote_participant_user_ids, cx),
538 user_store.get_users(pending_participant_user_ids, cx),
539 )
540 });
541
542 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
543 let (remote_participants, pending_participants) =
544 futures::join!(remote_participants, pending_participants);
545
546 this.update(&mut cx, |this, cx| {
547 this.participant_user_ids.clear();
548
549 if let Some(participant) = local_participant {
550 this.local_participant.projects = participant.projects;
551 } else {
552 this.local_participant.projects.clear();
553 }
554
555 if let Some(participants) = remote_participants.log_err() {
556 for (participant, user) in room.participants.into_iter().zip(participants) {
557 let Some(peer_id) = participant.peer_id else { continue };
558 this.participant_user_ids.insert(participant.user_id);
559
560 let old_projects = this
561 .remote_participants
562 .get(&participant.user_id)
563 .into_iter()
564 .flat_map(|existing| &existing.projects)
565 .map(|project| project.id)
566 .collect::<HashSet<_>>();
567 let new_projects = participant
568 .projects
569 .iter()
570 .map(|project| project.id)
571 .collect::<HashSet<_>>();
572
573 for project in &participant.projects {
574 if !old_projects.contains(&project.id) {
575 cx.emit(Event::RemoteProjectShared {
576 owner: user.clone(),
577 project_id: project.id,
578 worktree_root_names: project.worktree_root_names.clone(),
579 });
580 }
581 }
582
583 for unshared_project_id in old_projects.difference(&new_projects) {
584 this.joined_projects.retain(|project| {
585 if let Some(project) = project.upgrade(cx) {
586 project.update(cx, |project, cx| {
587 if project.remote_id() == Some(*unshared_project_id) {
588 project.disconnected_from_host(cx);
589 false
590 } else {
591 true
592 }
593 })
594 } else {
595 false
596 }
597 });
598 cx.emit(Event::RemoteProjectUnshared {
599 project_id: *unshared_project_id,
600 });
601 }
602
603 let location = ParticipantLocation::from_proto(participant.location)
604 .unwrap_or(ParticipantLocation::External);
605 if let Some(remote_participant) =
606 this.remote_participants.get_mut(&participant.user_id)
607 {
608 remote_participant.projects = participant.projects;
609 remote_participant.peer_id = peer_id;
610 if location != remote_participant.location {
611 remote_participant.location = location;
612 cx.emit(Event::ParticipantLocationChanged {
613 participant_id: peer_id,
614 });
615 }
616 } else {
617 this.remote_participants.insert(
618 participant.user_id,
619 RemoteParticipant {
620 user: user.clone(),
621 peer_id,
622 projects: participant.projects,
623 location,
624 tracks: Default::default(),
625 },
626 );
627
628 if let Some(live_kit) = this.live_kit.as_ref() {
629 let tracks =
630 live_kit.room.remote_video_tracks(&user.id.to_string());
631 for track in tracks {
632 this.remote_video_track_updated(
633 RemoteVideoTrackUpdate::Subscribed(track),
634 cx,
635 )
636 .log_err();
637 }
638 }
639 }
640 }
641
642 this.remote_participants.retain(|user_id, participant| {
643 if this.participant_user_ids.contains(user_id) {
644 true
645 } else {
646 for project in &participant.projects {
647 cx.emit(Event::RemoteProjectUnshared {
648 project_id: project.id,
649 });
650 }
651 false
652 }
653 });
654 }
655
656 if let Some(pending_participants) = pending_participants.log_err() {
657 this.pending_participants = pending_participants;
658 for participant in &this.pending_participants {
659 this.participant_user_ids.insert(participant.id);
660 }
661 }
662
663 this.follows_by_leader_id_project_id.clear();
664 for follower in room.followers {
665 let project_id = follower.project_id;
666 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
667 (Some(leader), Some(follower)) => (leader, follower),
668
669 _ => {
670 log::error!("Follower message {follower:?} missing some state");
671 continue;
672 }
673 };
674
675 let list = this
676 .follows_by_leader_id_project_id
677 .entry((leader, project_id))
678 .or_insert(Vec::new());
679 if !list.contains(&follower) {
680 list.push(follower);
681 }
682 }
683
684 this.pending_room_update.take();
685 if this.should_leave() {
686 log::info!("room is empty, leaving");
687 let _ = this.leave(cx);
688 }
689
690 this.check_invariants();
691 cx.notify();
692 });
693 }));
694
695 cx.notify();
696 Ok(())
697 }
698
699 fn remote_video_track_updated(
700 &mut self,
701 change: RemoteVideoTrackUpdate,
702 cx: &mut ModelContext<Self>,
703 ) -> Result<()> {
704 match change {
705 RemoteVideoTrackUpdate::Subscribed(track) => {
706 let user_id = track.publisher_id().parse()?;
707 let track_id = track.sid().to_string();
708 let participant = self
709 .remote_participants
710 .get_mut(&user_id)
711 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
712 participant.tracks.insert(
713 track_id.clone(),
714 Arc::new(RemoteVideoTrack {
715 live_kit_track: track,
716 }),
717 );
718 cx.emit(Event::RemoteVideoTracksChanged {
719 participant_id: participant.peer_id,
720 });
721 }
722 RemoteVideoTrackUpdate::Unsubscribed {
723 publisher_id,
724 track_id,
725 } => {
726 let user_id = publisher_id.parse()?;
727 let participant = self
728 .remote_participants
729 .get_mut(&user_id)
730 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
731 participant.tracks.remove(&track_id);
732 cx.emit(Event::RemoteVideoTracksChanged {
733 participant_id: participant.peer_id,
734 });
735 }
736 }
737
738 cx.notify();
739 Ok(())
740 }
741
742 fn check_invariants(&self) {
743 #[cfg(any(test, feature = "test-support"))]
744 {
745 for participant in self.remote_participants.values() {
746 assert!(self.participant_user_ids.contains(&participant.user.id));
747 assert_ne!(participant.user.id, self.client.user_id().unwrap());
748 }
749
750 for participant in &self.pending_participants {
751 assert!(self.participant_user_ids.contains(&participant.id));
752 assert_ne!(participant.id, self.client.user_id().unwrap());
753 }
754
755 assert_eq!(
756 self.participant_user_ids.len(),
757 self.remote_participants.len() + self.pending_participants.len()
758 );
759 }
760 }
761
762 pub(crate) fn call(
763 &mut self,
764 called_user_id: u64,
765 initial_project_id: Option<u64>,
766 cx: &mut ModelContext<Self>,
767 ) -> Task<Result<()>> {
768 if self.status.is_offline() {
769 return Task::ready(Err(anyhow!("room is offline")));
770 }
771
772 cx.notify();
773 let client = self.client.clone();
774 let room_id = self.id;
775 self.pending_call_count += 1;
776 cx.spawn(|this, mut cx| async move {
777 let result = client
778 .request(proto::Call {
779 room_id,
780 called_user_id,
781 initial_project_id,
782 })
783 .await;
784 this.update(&mut cx, |this, cx| {
785 this.pending_call_count -= 1;
786 if this.should_leave() {
787 this.leave(cx).detach_and_log_err(cx);
788 }
789 });
790 result?;
791 Ok(())
792 })
793 }
794
795 pub fn join_project(
796 &mut self,
797 id: u64,
798 language_registry: Arc<LanguageRegistry>,
799 fs: Arc<dyn Fs>,
800 cx: &mut ModelContext<Self>,
801 ) -> Task<Result<ModelHandle<Project>>> {
802 let client = self.client.clone();
803 let user_store = self.user_store.clone();
804 cx.spawn(|this, mut cx| async move {
805 let project =
806 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
807 this.update(&mut cx, |this, cx| {
808 this.joined_projects.retain(|project| {
809 if let Some(project) = project.upgrade(cx) {
810 !project.read(cx).is_read_only()
811 } else {
812 false
813 }
814 });
815 this.joined_projects.insert(project.downgrade());
816 });
817 Ok(project)
818 })
819 }
820
821 pub(crate) fn share_project(
822 &mut self,
823 project: ModelHandle<Project>,
824 cx: &mut ModelContext<Self>,
825 ) -> Task<Result<u64>> {
826 if let Some(project_id) = project.read(cx).remote_id() {
827 return Task::ready(Ok(project_id));
828 }
829
830 let request = self.client.request(proto::ShareProject {
831 room_id: self.id(),
832 worktrees: project.read(cx).worktree_metadata_protos(cx),
833 });
834 cx.spawn(|this, mut cx| async move {
835 let response = request.await?;
836
837 project.update(&mut cx, |project, cx| {
838 project.shared(response.project_id, cx)
839 })?;
840
841 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
842 this.update(&mut cx, |this, cx| {
843 this.shared_projects.insert(project.downgrade());
844 let active_project = this.local_participant.active_project.as_ref();
845 if active_project.map_or(false, |location| *location == project) {
846 this.set_location(Some(&project), cx)
847 } else {
848 Task::ready(Ok(()))
849 }
850 })
851 .await?;
852
853 Ok(response.project_id)
854 })
855 }
856
857 pub(crate) fn unshare_project(
858 &mut self,
859 project: ModelHandle<Project>,
860 cx: &mut ModelContext<Self>,
861 ) -> Result<()> {
862 let project_id = match project.read(cx).remote_id() {
863 Some(project_id) => project_id,
864 None => return Ok(()),
865 };
866
867 self.client.send(proto::UnshareProject { project_id })?;
868 project.update(cx, |this, cx| this.unshare(cx))
869 }
870
871 pub(crate) fn set_location(
872 &mut self,
873 project: Option<&ModelHandle<Project>>,
874 cx: &mut ModelContext<Self>,
875 ) -> Task<Result<()>> {
876 if self.status.is_offline() {
877 return Task::ready(Err(anyhow!("room is offline")));
878 }
879
880 let client = self.client.clone();
881 let room_id = self.id;
882 let location = if let Some(project) = project {
883 self.local_participant.active_project = Some(project.downgrade());
884 if let Some(project_id) = project.read(cx).remote_id() {
885 proto::participant_location::Variant::SharedProject(
886 proto::participant_location::SharedProject { id: project_id },
887 )
888 } else {
889 proto::participant_location::Variant::UnsharedProject(
890 proto::participant_location::UnsharedProject {},
891 )
892 }
893 } else {
894 self.local_participant.active_project = None;
895 proto::participant_location::Variant::External(proto::participant_location::External {})
896 };
897
898 cx.notify();
899 cx.foreground().spawn(async move {
900 client
901 .request(proto::UpdateParticipantLocation {
902 room_id,
903 location: Some(proto::ParticipantLocation {
904 variant: Some(location),
905 }),
906 })
907 .await?;
908 Ok(())
909 })
910 }
911
912 pub fn is_screen_sharing(&self) -> bool {
913 self.live_kit.as_ref().map_or(false, |live_kit| {
914 !matches!(live_kit.screen_track, ScreenTrack::None)
915 })
916 }
917
918 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
919 if self.status.is_offline() {
920 return Task::ready(Err(anyhow!("room is offline")));
921 } else if self.is_screen_sharing() {
922 return Task::ready(Err(anyhow!("screen was already shared")));
923 }
924
925 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
926 let publish_id = post_inc(&mut live_kit.next_publish_id);
927 live_kit.screen_track = ScreenTrack::Pending { publish_id };
928 cx.notify();
929 (live_kit.room.display_sources(), publish_id)
930 } else {
931 return Task::ready(Err(anyhow!("live-kit was not initialized")));
932 };
933
934 cx.spawn_weak(|this, mut cx| async move {
935 let publish_track = async {
936 let displays = displays.await?;
937 let display = displays
938 .first()
939 .ok_or_else(|| anyhow!("no display found"))?;
940 let track = LocalVideoTrack::screen_share_for_display(&display);
941 this.upgrade(&cx)
942 .ok_or_else(|| anyhow!("room was dropped"))?
943 .read_with(&cx, |this, _| {
944 this.live_kit
945 .as_ref()
946 .map(|live_kit| live_kit.room.publish_video_track(&track))
947 })
948 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
949 .await
950 };
951
952 let publication = publish_track.await;
953 this.upgrade(&cx)
954 .ok_or_else(|| anyhow!("room was dropped"))?
955 .update(&mut cx, |this, cx| {
956 let live_kit = this
957 .live_kit
958 .as_mut()
959 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
960
961 let canceled = if let ScreenTrack::Pending {
962 publish_id: cur_publish_id,
963 } = &live_kit.screen_track
964 {
965 *cur_publish_id != publish_id
966 } else {
967 true
968 };
969
970 match publication {
971 Ok(publication) => {
972 if canceled {
973 live_kit.room.unpublish_track(publication);
974 } else {
975 live_kit.screen_track = ScreenTrack::Published(publication);
976 cx.notify();
977 }
978 Ok(())
979 }
980 Err(error) => {
981 if canceled {
982 Ok(())
983 } else {
984 live_kit.screen_track = ScreenTrack::None;
985 cx.notify();
986 Err(error)
987 }
988 }
989 }
990 })
991 })
992 }
993
994 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
995 if self.status.is_offline() {
996 return Err(anyhow!("room is offline"));
997 }
998
999 let live_kit = self
1000 .live_kit
1001 .as_mut()
1002 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1003 match mem::take(&mut live_kit.screen_track) {
1004 ScreenTrack::None => Err(anyhow!("screen was not shared")),
1005 ScreenTrack::Pending { .. } => {
1006 cx.notify();
1007 Ok(())
1008 }
1009 ScreenTrack::Published(track) => {
1010 live_kit.room.unpublish_track(track);
1011 cx.notify();
1012 Ok(())
1013 }
1014 }
1015 }
1016
1017 #[cfg(any(test, feature = "test-support"))]
1018 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1019 self.live_kit
1020 .as_ref()
1021 .unwrap()
1022 .room
1023 .set_display_sources(sources);
1024 }
1025}
1026
1027struct LiveKitRoom {
1028 room: Arc<live_kit_client::Room>,
1029 screen_track: ScreenTrack,
1030 next_publish_id: usize,
1031 _maintain_room: Task<()>,
1032 _maintain_tracks: Task<()>,
1033}
1034
1035enum ScreenTrack {
1036 None,
1037 Pending { publish_id: usize },
1038 Published(LocalTrackPublication),
1039}
1040
1041impl Default for ScreenTrack {
1042 fn default() -> Self {
1043 Self::None
1044 }
1045}
1046
1047#[derive(Copy, Clone, PartialEq, Eq)]
1048pub enum RoomStatus {
1049 Online,
1050 Rejoining,
1051 Offline,
1052}
1053
1054impl RoomStatus {
1055 pub fn is_offline(&self) -> bool {
1056 matches!(self, RoomStatus::Offline)
1057 }
1058
1059 pub fn is_online(&self) -> bool {
1060 matches!(self, RoomStatus::Online)
1061 }
1062}