1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{
7 proto::{self, PeerId},
8 Client, TypedEnvelope, User, UserStore,
9};
10use collections::{BTreeMap, HashMap, HashSet};
11use fs::Fs;
12use futures::{FutureExt, StreamExt};
13use gpui::{
14 AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
15};
16use language::LanguageRegistry;
17use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
18use postage::stream::Stream;
19use project::Project;
20use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
21use util::{post_inc, ResultExt, TryFutureExt};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub enum Event {
27 ParticipantLocationChanged {
28 participant_id: proto::PeerId,
29 },
30 RemoteVideoTracksChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteProjectShared {
34 owner: Arc<User>,
35 project_id: u64,
36 worktree_root_names: Vec<String>,
37 },
38 RemoteProjectUnshared {
39 project_id: u64,
40 },
41 Left,
42}
43
44pub struct Room {
45 id: u64,
46 live_kit: Option<LiveKitRoom>,
47 status: RoomStatus,
48 shared_projects: HashSet<WeakModelHandle<Project>>,
49 joined_projects: HashSet<WeakModelHandle<Project>>,
50 local_participant: LocalParticipant,
51 remote_participants: BTreeMap<u64, RemoteParticipant>,
52 pending_participants: Vec<Arc<User>>,
53 participant_user_ids: HashSet<u64>,
54 pending_call_count: usize,
55 leave_when_empty: bool,
56 client: Arc<Client>,
57 user_store: ModelHandle<UserStore>,
58 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
59 subscriptions: Vec<client::Subscription>,
60 pending_room_update: Option<Task<()>>,
61 maintain_connection: Option<Task<Option<()>>>,
62}
63
64impl Entity for Room {
65 type Event = Event;
66
67 fn release(&mut self, cx: &mut MutableAppContext) {
68 if self.status.is_online() {
69 self.leave_internal(cx).detach_and_log_err(cx);
70 }
71 }
72
73 fn app_will_quit(
74 &mut self,
75 cx: &mut MutableAppContext,
76 ) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
77 if self.status.is_online() {
78 let leave = self.leave_internal(cx);
79 Some(
80 cx.background()
81 .spawn(async move {
82 leave.await.log_err();
83 })
84 .boxed(),
85 )
86 } else {
87 None
88 }
89 }
90}
91
92impl Room {
93 fn new(
94 id: u64,
95 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
96 client: Arc<Client>,
97 user_store: ModelHandle<UserStore>,
98 cx: &mut ModelContext<Self>,
99 ) -> Self {
100 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
101 let room = live_kit_client::Room::new();
102 let mut status = room.status();
103 // Consume the initial status of the room.
104 let _ = status.try_recv();
105 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
106 while let Some(status) = status.next().await {
107 let this = if let Some(this) = this.upgrade(&cx) {
108 this
109 } else {
110 break;
111 };
112
113 if status == live_kit_client::ConnectionState::Disconnected {
114 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
115 break;
116 }
117 }
118 });
119
120 let mut track_changes = room.remote_video_track_updates();
121 let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
122 while let Some(track_change) = track_changes.next().await {
123 let this = if let Some(this) = this.upgrade(&cx) {
124 this
125 } else {
126 break;
127 };
128
129 this.update(&mut cx, |this, cx| {
130 this.remote_video_track_updated(track_change, cx).log_err()
131 });
132 }
133 });
134
135 cx.foreground()
136 .spawn(room.connect(&connection_info.server_url, &connection_info.token))
137 .detach_and_log_err(cx);
138
139 Some(LiveKitRoom {
140 room,
141 screen_track: ScreenTrack::None,
142 next_publish_id: 0,
143 _maintain_room,
144 _maintain_tracks,
145 })
146 } else {
147 None
148 };
149
150 let maintain_connection =
151 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
152
153 Self {
154 id,
155 live_kit: live_kit_room,
156 status: RoomStatus::Online,
157 shared_projects: Default::default(),
158 joined_projects: Default::default(),
159 participant_user_ids: Default::default(),
160 local_participant: Default::default(),
161 remote_participants: Default::default(),
162 pending_participants: Default::default(),
163 pending_call_count: 0,
164 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
165 leave_when_empty: false,
166 pending_room_update: None,
167 client,
168 user_store,
169 follows_by_leader_id_project_id: Default::default(),
170 maintain_connection: Some(maintain_connection),
171 }
172 }
173
174 pub(crate) fn create(
175 called_user_id: u64,
176 initial_project: Option<ModelHandle<Project>>,
177 client: Arc<Client>,
178 user_store: ModelHandle<UserStore>,
179 cx: &mut MutableAppContext,
180 ) -> Task<Result<ModelHandle<Self>>> {
181 cx.spawn(|mut cx| async move {
182 let response = client.request(proto::CreateRoom {}).await?;
183 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
184 let room = cx.add_model(|cx| {
185 Self::new(
186 room_proto.id,
187 response.live_kit_connection_info,
188 client,
189 user_store,
190 cx,
191 )
192 });
193
194 let initial_project_id = if let Some(initial_project) = initial_project {
195 let initial_project_id = room
196 .update(&mut cx, |room, cx| {
197 room.share_project(initial_project.clone(), cx)
198 })
199 .await?;
200 Some(initial_project_id)
201 } else {
202 None
203 };
204
205 match room
206 .update(&mut cx, |room, cx| {
207 room.leave_when_empty = true;
208 room.call(called_user_id, initial_project_id, cx)
209 })
210 .await
211 {
212 Ok(()) => Ok(room),
213 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
214 }
215 })
216 }
217
218 pub(crate) fn join(
219 call: &IncomingCall,
220 client: Arc<Client>,
221 user_store: ModelHandle<UserStore>,
222 cx: &mut MutableAppContext,
223 ) -> Task<Result<ModelHandle<Self>>> {
224 let room_id = call.room_id;
225 cx.spawn(|mut cx| async move {
226 let response = client.request(proto::JoinRoom { id: room_id }).await?;
227 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
228 let room = cx.add_model(|cx| {
229 Self::new(
230 room_id,
231 response.live_kit_connection_info,
232 client,
233 user_store,
234 cx,
235 )
236 });
237 room.update(&mut cx, |room, cx| {
238 room.leave_when_empty = true;
239 room.apply_room_update(room_proto, cx)?;
240 anyhow::Ok(())
241 })?;
242 Ok(room)
243 })
244 }
245
246 fn should_leave(&self) -> bool {
247 self.leave_when_empty
248 && self.pending_room_update.is_none()
249 && self.pending_participants.is_empty()
250 && self.remote_participants.is_empty()
251 && self.pending_call_count == 0
252 }
253
254 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
255 cx.notify();
256 cx.emit(Event::Left);
257 self.leave_internal(cx)
258 }
259
260 fn leave_internal(&mut self, cx: &mut MutableAppContext) -> Task<Result<()>> {
261 if self.status.is_offline() {
262 return Task::ready(Err(anyhow!("room is offline")));
263 }
264
265 log::info!("leaving room");
266
267 for project in self.shared_projects.drain() {
268 if let Some(project) = project.upgrade(cx) {
269 project.update(cx, |project, cx| {
270 project.unshare(cx).log_err();
271 });
272 }
273 }
274 for project in self.joined_projects.drain() {
275 if let Some(project) = project.upgrade(cx) {
276 project.update(cx, |project, cx| {
277 project.disconnected_from_host(cx);
278 });
279 }
280 }
281
282 self.status = RoomStatus::Offline;
283 self.remote_participants.clear();
284 self.pending_participants.clear();
285 self.participant_user_ids.clear();
286 self.subscriptions.clear();
287 self.live_kit.take();
288 self.pending_room_update.take();
289 self.maintain_connection.take();
290
291 let leave_room = self.client.request(proto::LeaveRoom {});
292 cx.background().spawn(async move {
293 leave_room.await?;
294 anyhow::Ok(())
295 })
296 }
297
298 async fn maintain_connection(
299 this: WeakModelHandle<Self>,
300 client: Arc<Client>,
301 mut cx: AsyncAppContext,
302 ) -> Result<()> {
303 let mut client_status = client.status();
304 loop {
305 let _ = client_status.try_recv();
306 let is_connected = client_status.borrow().is_connected();
307 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
308 if !is_connected || client_status.next().await.is_some() {
309 log::info!("detected client disconnection");
310
311 this.upgrade(&cx)
312 .ok_or_else(|| anyhow!("room was dropped"))?
313 .update(&mut cx, |this, cx| {
314 this.status = RoomStatus::Rejoining;
315 cx.notify();
316 });
317
318 // Wait for client to re-establish a connection to the server.
319 {
320 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
321 let client_reconnection = async {
322 let mut remaining_attempts = 3;
323 while remaining_attempts > 0 {
324 if client_status.borrow().is_connected() {
325 log::info!("client reconnected, attempting to rejoin room");
326
327 let Some(this) = this.upgrade(&cx) else { break };
328 if this
329 .update(&mut cx, |this, cx| this.rejoin(cx))
330 .await
331 .log_err()
332 .is_some()
333 {
334 return true;
335 } else {
336 remaining_attempts -= 1;
337 }
338 } else if client_status.borrow().is_signed_out() {
339 return false;
340 }
341
342 log::info!(
343 "waiting for client status change, remaining attempts {}",
344 remaining_attempts
345 );
346 client_status.next().await;
347 }
348 false
349 }
350 .fuse();
351 futures::pin_mut!(client_reconnection);
352
353 futures::select_biased! {
354 reconnected = client_reconnection => {
355 if reconnected {
356 log::info!("successfully reconnected to room");
357 // If we successfully joined the room, go back around the loop
358 // waiting for future connection status changes.
359 continue;
360 }
361 }
362 _ = reconnection_timeout => {
363 log::info!("room reconnection timeout expired");
364 }
365 }
366 }
367
368 break;
369 }
370 }
371
372 // The client failed to re-establish a connection to the server
373 // or an error occurred while trying to re-join the room. Either way
374 // we leave the room and return an error.
375 if let Some(this) = this.upgrade(&cx) {
376 log::info!("reconnection failed, leaving room");
377 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
378 }
379 Err(anyhow!(
380 "can't reconnect to room: client failed to re-establish connection"
381 ))
382 }
383
384 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
385 let mut projects = HashMap::default();
386 let mut reshared_projects = Vec::new();
387 let mut rejoined_projects = Vec::new();
388 self.shared_projects.retain(|project| {
389 if let Some(handle) = project.upgrade(cx) {
390 let project = handle.read(cx);
391 if let Some(project_id) = project.remote_id() {
392 projects.insert(project_id, handle.clone());
393 reshared_projects.push(proto::UpdateProject {
394 project_id,
395 worktrees: project.worktree_metadata_protos(cx),
396 });
397 return true;
398 }
399 }
400 false
401 });
402 self.joined_projects.retain(|project| {
403 if let Some(handle) = project.upgrade(cx) {
404 let project = handle.read(cx);
405 if let Some(project_id) = project.remote_id() {
406 projects.insert(project_id, handle.clone());
407 rejoined_projects.push(proto::RejoinProject {
408 id: project_id,
409 worktrees: project
410 .worktrees(cx)
411 .map(|worktree| {
412 let worktree = worktree.read(cx);
413 proto::RejoinWorktree {
414 id: worktree.id().to_proto(),
415 scan_id: worktree.completed_scan_id() as u64,
416 }
417 })
418 .collect(),
419 });
420 }
421 return true;
422 }
423 false
424 });
425
426 let response = self.client.request(proto::RejoinRoom {
427 id: self.id,
428 reshared_projects,
429 rejoined_projects,
430 });
431
432 cx.spawn(|this, mut cx| async move {
433 let response = response.await?;
434 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
435 this.update(&mut cx, |this, cx| {
436 this.status = RoomStatus::Online;
437 this.apply_room_update(room_proto, cx)?;
438
439 for reshared_project in response.reshared_projects {
440 if let Some(project) = projects.get(&reshared_project.id) {
441 project.update(cx, |project, cx| {
442 project.reshared(reshared_project, cx).log_err();
443 });
444 }
445 }
446
447 for rejoined_project in response.rejoined_projects {
448 if let Some(project) = projects.get(&rejoined_project.id) {
449 project.update(cx, |project, cx| {
450 project.rejoined(rejoined_project, cx).log_err();
451 });
452 }
453 }
454
455 anyhow::Ok(())
456 })
457 })
458 }
459
460 pub fn id(&self) -> u64 {
461 self.id
462 }
463
464 pub fn status(&self) -> RoomStatus {
465 self.status
466 }
467
468 pub fn local_participant(&self) -> &LocalParticipant {
469 &self.local_participant
470 }
471
472 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
473 &self.remote_participants
474 }
475
476 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
477 self.remote_participants
478 .values()
479 .find(|p| p.peer_id == peer_id)
480 }
481
482 pub fn pending_participants(&self) -> &[Arc<User>] {
483 &self.pending_participants
484 }
485
486 pub fn contains_participant(&self, user_id: u64) -> bool {
487 self.participant_user_ids.contains(&user_id)
488 }
489
490 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
491 self.follows_by_leader_id_project_id
492 .get(&(leader_id, project_id))
493 .map_or(&[], |v| v.as_slice())
494 }
495
496 async fn handle_room_updated(
497 this: ModelHandle<Self>,
498 envelope: TypedEnvelope<proto::RoomUpdated>,
499 _: Arc<Client>,
500 mut cx: AsyncAppContext,
501 ) -> Result<()> {
502 let room = envelope
503 .payload
504 .room
505 .ok_or_else(|| anyhow!("invalid room"))?;
506 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
507 }
508
509 fn apply_room_update(
510 &mut self,
511 mut room: proto::Room,
512 cx: &mut ModelContext<Self>,
513 ) -> Result<()> {
514 // Filter ourselves out from the room's participants.
515 let local_participant_ix = room
516 .participants
517 .iter()
518 .position(|participant| Some(participant.user_id) == self.client.user_id());
519 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
520
521 let pending_participant_user_ids = room
522 .pending_participants
523 .iter()
524 .map(|p| p.user_id)
525 .collect::<Vec<_>>();
526
527 let remote_participant_user_ids = room
528 .participants
529 .iter()
530 .map(|p| p.user_id)
531 .collect::<Vec<_>>();
532
533 let (remote_participants, pending_participants) =
534 self.user_store.update(cx, move |user_store, cx| {
535 (
536 user_store.get_users(remote_participant_user_ids, cx),
537 user_store.get_users(pending_participant_user_ids, cx),
538 )
539 });
540
541 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
542 let (remote_participants, pending_participants) =
543 futures::join!(remote_participants, pending_participants);
544
545 this.update(&mut cx, |this, cx| {
546 this.participant_user_ids.clear();
547
548 if let Some(participant) = local_participant {
549 this.local_participant.projects = participant.projects;
550 } else {
551 this.local_participant.projects.clear();
552 }
553
554 if let Some(participants) = remote_participants.log_err() {
555 for (participant, user) in room.participants.into_iter().zip(participants) {
556 let Some(peer_id) = participant.peer_id else { continue };
557 this.participant_user_ids.insert(participant.user_id);
558
559 let old_projects = this
560 .remote_participants
561 .get(&participant.user_id)
562 .into_iter()
563 .flat_map(|existing| &existing.projects)
564 .map(|project| project.id)
565 .collect::<HashSet<_>>();
566 let new_projects = participant
567 .projects
568 .iter()
569 .map(|project| project.id)
570 .collect::<HashSet<_>>();
571
572 for project in &participant.projects {
573 if !old_projects.contains(&project.id) {
574 cx.emit(Event::RemoteProjectShared {
575 owner: user.clone(),
576 project_id: project.id,
577 worktree_root_names: project.worktree_root_names.clone(),
578 });
579 }
580 }
581
582 for unshared_project_id in old_projects.difference(&new_projects) {
583 this.joined_projects.retain(|project| {
584 if let Some(project) = project.upgrade(cx) {
585 project.update(cx, |project, cx| {
586 if project.remote_id() == Some(*unshared_project_id) {
587 project.disconnected_from_host(cx);
588 false
589 } else {
590 true
591 }
592 })
593 } else {
594 false
595 }
596 });
597 cx.emit(Event::RemoteProjectUnshared {
598 project_id: *unshared_project_id,
599 });
600 }
601
602 let location = ParticipantLocation::from_proto(participant.location)
603 .unwrap_or(ParticipantLocation::External);
604 if let Some(remote_participant) =
605 this.remote_participants.get_mut(&participant.user_id)
606 {
607 remote_participant.projects = participant.projects;
608 remote_participant.peer_id = peer_id;
609 if location != remote_participant.location {
610 remote_participant.location = location;
611 cx.emit(Event::ParticipantLocationChanged {
612 participant_id: peer_id,
613 });
614 }
615 } else {
616 this.remote_participants.insert(
617 participant.user_id,
618 RemoteParticipant {
619 user: user.clone(),
620 peer_id,
621 projects: participant.projects,
622 location,
623 tracks: Default::default(),
624 },
625 );
626
627 if let Some(live_kit) = this.live_kit.as_ref() {
628 let tracks =
629 live_kit.room.remote_video_tracks(&user.id.to_string());
630 for track in tracks {
631 this.remote_video_track_updated(
632 RemoteVideoTrackUpdate::Subscribed(track),
633 cx,
634 )
635 .log_err();
636 }
637 }
638 }
639 }
640
641 this.remote_participants.retain(|user_id, participant| {
642 if this.participant_user_ids.contains(user_id) {
643 true
644 } else {
645 for project in &participant.projects {
646 cx.emit(Event::RemoteProjectUnshared {
647 project_id: project.id,
648 });
649 }
650 false
651 }
652 });
653 }
654
655 if let Some(pending_participants) = pending_participants.log_err() {
656 this.pending_participants = pending_participants;
657 for participant in &this.pending_participants {
658 this.participant_user_ids.insert(participant.id);
659 }
660 }
661
662 this.follows_by_leader_id_project_id.clear();
663 for follower in room.followers {
664 let project_id = follower.project_id;
665 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
666 (Some(leader), Some(follower)) => (leader, follower),
667
668 _ => {
669 log::error!("Follower message {follower:?} missing some state");
670 continue;
671 }
672 };
673
674 let list = this
675 .follows_by_leader_id_project_id
676 .entry((leader, project_id))
677 .or_insert(Vec::new());
678 if !list.contains(&follower) {
679 list.push(follower);
680 }
681 }
682
683 this.pending_room_update.take();
684 if this.should_leave() {
685 log::info!("room is empty, leaving");
686 let _ = this.leave(cx);
687 }
688
689 this.check_invariants();
690 cx.notify();
691 });
692 }));
693
694 cx.notify();
695 Ok(())
696 }
697
698 fn remote_video_track_updated(
699 &mut self,
700 change: RemoteVideoTrackUpdate,
701 cx: &mut ModelContext<Self>,
702 ) -> Result<()> {
703 match change {
704 RemoteVideoTrackUpdate::Subscribed(track) => {
705 let user_id = track.publisher_id().parse()?;
706 let track_id = track.sid().to_string();
707 let participant = self
708 .remote_participants
709 .get_mut(&user_id)
710 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
711 participant.tracks.insert(
712 track_id.clone(),
713 Arc::new(RemoteVideoTrack {
714 live_kit_track: track,
715 }),
716 );
717 cx.emit(Event::RemoteVideoTracksChanged {
718 participant_id: participant.peer_id,
719 });
720 }
721 RemoteVideoTrackUpdate::Unsubscribed {
722 publisher_id,
723 track_id,
724 } => {
725 let user_id = publisher_id.parse()?;
726 let participant = self
727 .remote_participants
728 .get_mut(&user_id)
729 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
730 participant.tracks.remove(&track_id);
731 cx.emit(Event::RemoteVideoTracksChanged {
732 participant_id: participant.peer_id,
733 });
734 }
735 }
736
737 cx.notify();
738 Ok(())
739 }
740
741 fn check_invariants(&self) {
742 #[cfg(any(test, feature = "test-support"))]
743 {
744 for participant in self.remote_participants.values() {
745 assert!(self.participant_user_ids.contains(&participant.user.id));
746 assert_ne!(participant.user.id, self.client.user_id().unwrap());
747 }
748
749 for participant in &self.pending_participants {
750 assert!(self.participant_user_ids.contains(&participant.id));
751 assert_ne!(participant.id, self.client.user_id().unwrap());
752 }
753
754 assert_eq!(
755 self.participant_user_ids.len(),
756 self.remote_participants.len() + self.pending_participants.len()
757 );
758 }
759 }
760
761 pub(crate) fn call(
762 &mut self,
763 called_user_id: u64,
764 initial_project_id: Option<u64>,
765 cx: &mut ModelContext<Self>,
766 ) -> Task<Result<()>> {
767 if self.status.is_offline() {
768 return Task::ready(Err(anyhow!("room is offline")));
769 }
770
771 cx.notify();
772 let client = self.client.clone();
773 let room_id = self.id;
774 self.pending_call_count += 1;
775 cx.spawn(|this, mut cx| async move {
776 let result = client
777 .request(proto::Call {
778 room_id,
779 called_user_id,
780 initial_project_id,
781 })
782 .await;
783 this.update(&mut cx, |this, cx| {
784 this.pending_call_count -= 1;
785 if this.should_leave() {
786 this.leave(cx).detach_and_log_err(cx);
787 }
788 });
789 result?;
790 Ok(())
791 })
792 }
793
794 pub fn join_project(
795 &mut self,
796 id: u64,
797 language_registry: Arc<LanguageRegistry>,
798 fs: Arc<dyn Fs>,
799 cx: &mut ModelContext<Self>,
800 ) -> Task<Result<ModelHandle<Project>>> {
801 let client = self.client.clone();
802 let user_store = self.user_store.clone();
803 cx.spawn(|this, mut cx| async move {
804 let project =
805 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
806 this.update(&mut cx, |this, cx| {
807 this.joined_projects.retain(|project| {
808 if let Some(project) = project.upgrade(cx) {
809 !project.read(cx).is_read_only()
810 } else {
811 false
812 }
813 });
814 this.joined_projects.insert(project.downgrade());
815 });
816 Ok(project)
817 })
818 }
819
820 pub(crate) fn share_project(
821 &mut self,
822 project: ModelHandle<Project>,
823 cx: &mut ModelContext<Self>,
824 ) -> Task<Result<u64>> {
825 if let Some(project_id) = project.read(cx).remote_id() {
826 return Task::ready(Ok(project_id));
827 }
828
829 let request = self.client.request(proto::ShareProject {
830 room_id: self.id(),
831 worktrees: project.read(cx).worktree_metadata_protos(cx),
832 });
833 cx.spawn(|this, mut cx| async move {
834 let response = request.await?;
835
836 project.update(&mut cx, |project, cx| {
837 project.shared(response.project_id, cx)
838 })?;
839
840 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
841 this.update(&mut cx, |this, cx| {
842 this.shared_projects.insert(project.downgrade());
843 let active_project = this.local_participant.active_project.as_ref();
844 if active_project.map_or(false, |location| *location == project) {
845 this.set_location(Some(&project), cx)
846 } else {
847 Task::ready(Ok(()))
848 }
849 })
850 .await?;
851
852 Ok(response.project_id)
853 })
854 }
855
856 pub(crate) fn unshare_project(
857 &mut self,
858 project: ModelHandle<Project>,
859 cx: &mut ModelContext<Self>,
860 ) -> Result<()> {
861 let project_id = match project.read(cx).remote_id() {
862 Some(project_id) => project_id,
863 None => return Ok(()),
864 };
865
866 self.client.send(proto::UnshareProject { project_id })?;
867 project.update(cx, |this, cx| this.unshare(cx))
868 }
869
870 pub(crate) fn set_location(
871 &mut self,
872 project: Option<&ModelHandle<Project>>,
873 cx: &mut ModelContext<Self>,
874 ) -> Task<Result<()>> {
875 if self.status.is_offline() {
876 return Task::ready(Err(anyhow!("room is offline")));
877 }
878
879 let client = self.client.clone();
880 let room_id = self.id;
881 let location = if let Some(project) = project {
882 self.local_participant.active_project = Some(project.downgrade());
883 if let Some(project_id) = project.read(cx).remote_id() {
884 proto::participant_location::Variant::SharedProject(
885 proto::participant_location::SharedProject { id: project_id },
886 )
887 } else {
888 proto::participant_location::Variant::UnsharedProject(
889 proto::participant_location::UnsharedProject {},
890 )
891 }
892 } else {
893 self.local_participant.active_project = None;
894 proto::participant_location::Variant::External(proto::participant_location::External {})
895 };
896
897 cx.notify();
898 cx.foreground().spawn(async move {
899 client
900 .request(proto::UpdateParticipantLocation {
901 room_id,
902 location: Some(proto::ParticipantLocation {
903 variant: Some(location),
904 }),
905 })
906 .await?;
907 Ok(())
908 })
909 }
910
911 pub fn is_screen_sharing(&self) -> bool {
912 self.live_kit.as_ref().map_or(false, |live_kit| {
913 !matches!(live_kit.screen_track, ScreenTrack::None)
914 })
915 }
916
917 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
918 if self.status.is_offline() {
919 return Task::ready(Err(anyhow!("room is offline")));
920 } else if self.is_screen_sharing() {
921 return Task::ready(Err(anyhow!("screen was already shared")));
922 }
923
924 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
925 let publish_id = post_inc(&mut live_kit.next_publish_id);
926 live_kit.screen_track = ScreenTrack::Pending { publish_id };
927 cx.notify();
928 (live_kit.room.display_sources(), publish_id)
929 } else {
930 return Task::ready(Err(anyhow!("live-kit was not initialized")));
931 };
932
933 cx.spawn_weak(|this, mut cx| async move {
934 let publish_track = async {
935 let displays = displays.await?;
936 let display = displays
937 .first()
938 .ok_or_else(|| anyhow!("no display found"))?;
939 let track = LocalVideoTrack::screen_share_for_display(&display);
940 this.upgrade(&cx)
941 .ok_or_else(|| anyhow!("room was dropped"))?
942 .read_with(&cx, |this, _| {
943 this.live_kit
944 .as_ref()
945 .map(|live_kit| live_kit.room.publish_video_track(&track))
946 })
947 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
948 .await
949 };
950
951 let publication = publish_track.await;
952 this.upgrade(&cx)
953 .ok_or_else(|| anyhow!("room was dropped"))?
954 .update(&mut cx, |this, cx| {
955 let live_kit = this
956 .live_kit
957 .as_mut()
958 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
959
960 let canceled = if let ScreenTrack::Pending {
961 publish_id: cur_publish_id,
962 } = &live_kit.screen_track
963 {
964 *cur_publish_id != publish_id
965 } else {
966 true
967 };
968
969 match publication {
970 Ok(publication) => {
971 if canceled {
972 live_kit.room.unpublish_track(publication);
973 } else {
974 live_kit.screen_track = ScreenTrack::Published(publication);
975 cx.notify();
976 }
977 Ok(())
978 }
979 Err(error) => {
980 if canceled {
981 Ok(())
982 } else {
983 live_kit.screen_track = ScreenTrack::None;
984 cx.notify();
985 Err(error)
986 }
987 }
988 }
989 })
990 })
991 }
992
993 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
994 if self.status.is_offline() {
995 return Err(anyhow!("room is offline"));
996 }
997
998 let live_kit = self
999 .live_kit
1000 .as_mut()
1001 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1002 match mem::take(&mut live_kit.screen_track) {
1003 ScreenTrack::None => Err(anyhow!("screen was not shared")),
1004 ScreenTrack::Pending { .. } => {
1005 cx.notify();
1006 Ok(())
1007 }
1008 ScreenTrack::Published(track) => {
1009 live_kit.room.unpublish_track(track);
1010 cx.notify();
1011 Ok(())
1012 }
1013 }
1014 }
1015
1016 #[cfg(any(test, feature = "test-support"))]
1017 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1018 self.live_kit
1019 .as_ref()
1020 .unwrap()
1021 .room
1022 .set_display_sources(sources);
1023 }
1024}
1025
1026struct LiveKitRoom {
1027 room: Arc<live_kit_client::Room>,
1028 screen_track: ScreenTrack,
1029 next_publish_id: usize,
1030 _maintain_room: Task<()>,
1031 _maintain_tracks: Task<()>,
1032}
1033
1034enum ScreenTrack {
1035 None,
1036 Pending { publish_id: usize },
1037 Published(LocalTrackPublication),
1038}
1039
1040impl Default for ScreenTrack {
1041 fn default() -> Self {
1042 Self::None
1043 }
1044}
1045
1046#[derive(Copy, Clone, PartialEq, Eq)]
1047pub enum RoomStatus {
1048 Online,
1049 Rejoining,
1050 Offline,
1051}
1052
1053impl RoomStatus {
1054 pub fn is_offline(&self) -> bool {
1055 matches!(self, RoomStatus::Offline)
1056 }
1057
1058 pub fn is_online(&self) -> bool {
1059 matches!(self, RoomStatus::Online)
1060 }
1061}