1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{
7 proto::{self, PeerId},
8 Client, TypedEnvelope, User, UserStore,
9};
10use collections::{BTreeMap, HashMap, HashSet};
11use fs::Fs;
12use futures::{FutureExt, StreamExt};
13use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
14use language::LanguageRegistry;
15use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
16use postage::stream::Stream;
17use project::Project;
18use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
19use util::{post_inc, ResultExt, TryFutureExt};
20
21pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24pub enum Event {
25 ParticipantLocationChanged {
26 participant_id: proto::PeerId,
27 },
28 RemoteVideoTracksChanged {
29 participant_id: proto::PeerId,
30 },
31 RemoteProjectShared {
32 owner: Arc<User>,
33 project_id: u64,
34 worktree_root_names: Vec<String>,
35 },
36 RemoteProjectUnshared {
37 project_id: u64,
38 },
39 Left,
40}
41
42pub struct Room {
43 id: u64,
44 live_kit: Option<LiveKitRoom>,
45 status: RoomStatus,
46 shared_projects: HashSet<WeakModelHandle<Project>>,
47 joined_projects: HashSet<WeakModelHandle<Project>>,
48 local_participant: LocalParticipant,
49 remote_participants: BTreeMap<u64, RemoteParticipant>,
50 pending_participants: Vec<Arc<User>>,
51 participant_user_ids: HashSet<u64>,
52 pending_call_count: usize,
53 leave_when_empty: bool,
54 client: Arc<Client>,
55 user_store: ModelHandle<UserStore>,
56 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
57 subscriptions: Vec<client::Subscription>,
58 pending_room_update: Option<Task<()>>,
59 maintain_connection: Option<Task<Option<()>>>,
60}
61
62impl Entity for Room {
63 type Event = Event;
64
65 fn release(&mut self, cx: &mut AppContext) {
66 if self.status.is_online() {
67 self.leave_internal(cx).detach_and_log_err(cx);
68 }
69 }
70
71 fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
72 if self.status.is_online() {
73 let leave = self.leave_internal(cx);
74 Some(
75 cx.background()
76 .spawn(async move {
77 leave.await.log_err();
78 })
79 .boxed(),
80 )
81 } else {
82 None
83 }
84 }
85}
86
87impl Room {
88 fn new(
89 id: u64,
90 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
91 client: Arc<Client>,
92 user_store: ModelHandle<UserStore>,
93 cx: &mut ModelContext<Self>,
94 ) -> Self {
95 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
96 let room = live_kit_client::Room::new();
97 let mut status = room.status();
98 // Consume the initial status of the room.
99 let _ = status.try_recv();
100 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
101 while let Some(status) = status.next().await {
102 let this = if let Some(this) = this.upgrade(&cx) {
103 this
104 } else {
105 break;
106 };
107
108 if status == live_kit_client::ConnectionState::Disconnected {
109 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
110 break;
111 }
112 }
113 });
114
115 let mut track_changes = room.remote_video_track_updates();
116 let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
117 while let Some(track_change) = track_changes.next().await {
118 let this = if let Some(this) = this.upgrade(&cx) {
119 this
120 } else {
121 break;
122 };
123
124 this.update(&mut cx, |this, cx| {
125 this.remote_video_track_updated(track_change, cx).log_err()
126 });
127 }
128 });
129
130 cx.foreground()
131 .spawn(room.connect(&connection_info.server_url, &connection_info.token))
132 .detach_and_log_err(cx);
133
134 Some(LiveKitRoom {
135 room,
136 screen_track: ScreenTrack::None,
137 next_publish_id: 0,
138 _maintain_room,
139 _maintain_tracks,
140 })
141 } else {
142 None
143 };
144
145 let maintain_connection =
146 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
147
148 Self {
149 id,
150 live_kit: live_kit_room,
151 status: RoomStatus::Online,
152 shared_projects: Default::default(),
153 joined_projects: Default::default(),
154 participant_user_ids: Default::default(),
155 local_participant: Default::default(),
156 remote_participants: Default::default(),
157 pending_participants: Default::default(),
158 pending_call_count: 0,
159 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
160 leave_when_empty: false,
161 pending_room_update: None,
162 client,
163 user_store,
164 follows_by_leader_id_project_id: Default::default(),
165 maintain_connection: Some(maintain_connection),
166 }
167 }
168
169 pub(crate) fn create(
170 called_user_id: u64,
171 initial_project: Option<ModelHandle<Project>>,
172 client: Arc<Client>,
173 user_store: ModelHandle<UserStore>,
174 cx: &mut AppContext,
175 ) -> Task<Result<ModelHandle<Self>>> {
176 cx.spawn(|mut cx| async move {
177 let response = client.request(proto::CreateRoom {}).await?;
178 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
179 let room = cx.add_model(|cx| {
180 Self::new(
181 room_proto.id,
182 response.live_kit_connection_info,
183 client,
184 user_store,
185 cx,
186 )
187 });
188
189 let initial_project_id = if let Some(initial_project) = initial_project {
190 let initial_project_id = room
191 .update(&mut cx, |room, cx| {
192 room.share_project(initial_project.clone(), cx)
193 })
194 .await?;
195 Some(initial_project_id)
196 } else {
197 None
198 };
199
200 match room
201 .update(&mut cx, |room, cx| {
202 room.leave_when_empty = true;
203 room.call(called_user_id, initial_project_id, cx)
204 })
205 .await
206 {
207 Ok(()) => Ok(room),
208 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
209 }
210 })
211 }
212
213 pub(crate) fn join(
214 call: &IncomingCall,
215 client: Arc<Client>,
216 user_store: ModelHandle<UserStore>,
217 cx: &mut AppContext,
218 ) -> Task<Result<ModelHandle<Self>>> {
219 let room_id = call.room_id;
220 cx.spawn(|mut cx| async move {
221 let response = client.request(proto::JoinRoom { id: room_id }).await?;
222 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
223 let room = cx.add_model(|cx| {
224 Self::new(
225 room_id,
226 response.live_kit_connection_info,
227 client,
228 user_store,
229 cx,
230 )
231 });
232 room.update(&mut cx, |room, cx| {
233 room.leave_when_empty = true;
234 room.apply_room_update(room_proto, cx)?;
235 anyhow::Ok(())
236 })?;
237 Ok(room)
238 })
239 }
240
241 fn should_leave(&self) -> bool {
242 self.leave_when_empty
243 && self.pending_room_update.is_none()
244 && self.pending_participants.is_empty()
245 && self.remote_participants.is_empty()
246 && self.pending_call_count == 0
247 }
248
249 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
250 cx.notify();
251 cx.emit(Event::Left);
252 self.leave_internal(cx)
253 }
254
255 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
256 if self.status.is_offline() {
257 return Task::ready(Err(anyhow!("room is offline")));
258 }
259
260 log::info!("leaving room");
261
262 for project in self.shared_projects.drain() {
263 if let Some(project) = project.upgrade(cx) {
264 project.update(cx, |project, cx| {
265 project.unshare(cx).log_err();
266 });
267 }
268 }
269 for project in self.joined_projects.drain() {
270 if let Some(project) = project.upgrade(cx) {
271 project.update(cx, |project, cx| {
272 project.disconnected_from_host(cx);
273 project.close(cx);
274 });
275 }
276 }
277
278 self.status = RoomStatus::Offline;
279 self.remote_participants.clear();
280 self.pending_participants.clear();
281 self.participant_user_ids.clear();
282 self.subscriptions.clear();
283 self.live_kit.take();
284 self.pending_room_update.take();
285 self.maintain_connection.take();
286
287 let leave_room = self.client.request(proto::LeaveRoom {});
288 cx.background().spawn(async move {
289 leave_room.await?;
290 anyhow::Ok(())
291 })
292 }
293
294 async fn maintain_connection(
295 this: WeakModelHandle<Self>,
296 client: Arc<Client>,
297 mut cx: AsyncAppContext,
298 ) -> Result<()> {
299 let mut client_status = client.status();
300 loop {
301 let _ = client_status.try_recv();
302 let is_connected = client_status.borrow().is_connected();
303 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
304 if !is_connected || client_status.next().await.is_some() {
305 log::info!("detected client disconnection");
306
307 this.upgrade(&cx)
308 .ok_or_else(|| anyhow!("room was dropped"))?
309 .update(&mut cx, |this, cx| {
310 this.status = RoomStatus::Rejoining;
311 cx.notify();
312 });
313
314 // Wait for client to re-establish a connection to the server.
315 {
316 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
317 let client_reconnection = async {
318 let mut remaining_attempts = 3;
319 while remaining_attempts > 0 {
320 if client_status.borrow().is_connected() {
321 log::info!("client reconnected, attempting to rejoin room");
322
323 let Some(this) = this.upgrade(&cx) else { break };
324 if this
325 .update(&mut cx, |this, cx| this.rejoin(cx))
326 .await
327 .log_err()
328 .is_some()
329 {
330 return true;
331 } else {
332 remaining_attempts -= 1;
333 }
334 } else if client_status.borrow().is_signed_out() {
335 return false;
336 }
337
338 log::info!(
339 "waiting for client status change, remaining attempts {}",
340 remaining_attempts
341 );
342 client_status.next().await;
343 }
344 false
345 }
346 .fuse();
347 futures::pin_mut!(client_reconnection);
348
349 futures::select_biased! {
350 reconnected = client_reconnection => {
351 if reconnected {
352 log::info!("successfully reconnected to room");
353 // If we successfully joined the room, go back around the loop
354 // waiting for future connection status changes.
355 continue;
356 }
357 }
358 _ = reconnection_timeout => {
359 log::info!("room reconnection timeout expired");
360 }
361 }
362 }
363
364 break;
365 }
366 }
367
368 // The client failed to re-establish a connection to the server
369 // or an error occurred while trying to re-join the room. Either way
370 // we leave the room and return an error.
371 if let Some(this) = this.upgrade(&cx) {
372 log::info!("reconnection failed, leaving room");
373 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
374 }
375 Err(anyhow!(
376 "can't reconnect to room: client failed to re-establish connection"
377 ))
378 }
379
380 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
381 let mut projects = HashMap::default();
382 let mut reshared_projects = Vec::new();
383 let mut rejoined_projects = Vec::new();
384 self.shared_projects.retain(|project| {
385 if let Some(handle) = project.upgrade(cx) {
386 let project = handle.read(cx);
387 if let Some(project_id) = project.remote_id() {
388 projects.insert(project_id, handle.clone());
389 reshared_projects.push(proto::UpdateProject {
390 project_id,
391 worktrees: project.worktree_metadata_protos(cx),
392 });
393 return true;
394 }
395 }
396 false
397 });
398 self.joined_projects.retain(|project| {
399 if let Some(handle) = project.upgrade(cx) {
400 let project = handle.read(cx);
401 if let Some(project_id) = project.remote_id() {
402 projects.insert(project_id, handle.clone());
403 rejoined_projects.push(proto::RejoinProject {
404 id: project_id,
405 worktrees: project
406 .worktrees(cx)
407 .map(|worktree| {
408 let worktree = worktree.read(cx);
409 proto::RejoinWorktree {
410 id: worktree.id().to_proto(),
411 scan_id: worktree.completed_scan_id() as u64,
412 }
413 })
414 .collect(),
415 });
416 }
417 return true;
418 }
419 false
420 });
421
422 let response = self.client.request_envelope(proto::RejoinRoom {
423 id: self.id,
424 reshared_projects,
425 rejoined_projects,
426 });
427
428 cx.spawn(|this, mut cx| async move {
429 let response = response.await?;
430 let message_id = response.message_id;
431 let response = response.payload;
432 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
433 this.update(&mut cx, |this, cx| {
434 this.status = RoomStatus::Online;
435 this.apply_room_update(room_proto, cx)?;
436
437 for reshared_project in response.reshared_projects {
438 if let Some(project) = projects.get(&reshared_project.id) {
439 project.update(cx, |project, cx| {
440 project.reshared(reshared_project, cx).log_err();
441 });
442 }
443 }
444
445 for rejoined_project in response.rejoined_projects {
446 if let Some(project) = projects.get(&rejoined_project.id) {
447 project.update(cx, |project, cx| {
448 project.rejoined(rejoined_project, message_id, cx).log_err();
449 });
450 }
451 }
452
453 anyhow::Ok(())
454 })
455 })
456 }
457
458 pub fn id(&self) -> u64 {
459 self.id
460 }
461
462 pub fn status(&self) -> RoomStatus {
463 self.status
464 }
465
466 pub fn local_participant(&self) -> &LocalParticipant {
467 &self.local_participant
468 }
469
470 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
471 &self.remote_participants
472 }
473
474 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
475 self.remote_participants
476 .values()
477 .find(|p| p.peer_id == peer_id)
478 }
479
480 pub fn pending_participants(&self) -> &[Arc<User>] {
481 &self.pending_participants
482 }
483
484 pub fn contains_participant(&self, user_id: u64) -> bool {
485 self.participant_user_ids.contains(&user_id)
486 }
487
488 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
489 self.follows_by_leader_id_project_id
490 .get(&(leader_id, project_id))
491 .map_or(&[], |v| v.as_slice())
492 }
493
494 async fn handle_room_updated(
495 this: ModelHandle<Self>,
496 envelope: TypedEnvelope<proto::RoomUpdated>,
497 _: Arc<Client>,
498 mut cx: AsyncAppContext,
499 ) -> Result<()> {
500 let room = envelope
501 .payload
502 .room
503 .ok_or_else(|| anyhow!("invalid room"))?;
504 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
505 }
506
507 fn apply_room_update(
508 &mut self,
509 mut room: proto::Room,
510 cx: &mut ModelContext<Self>,
511 ) -> Result<()> {
512 // Filter ourselves out from the room's participants.
513 let local_participant_ix = room
514 .participants
515 .iter()
516 .position(|participant| Some(participant.user_id) == self.client.user_id());
517 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
518
519 let pending_participant_user_ids = room
520 .pending_participants
521 .iter()
522 .map(|p| p.user_id)
523 .collect::<Vec<_>>();
524
525 let remote_participant_user_ids = room
526 .participants
527 .iter()
528 .map(|p| p.user_id)
529 .collect::<Vec<_>>();
530
531 let (remote_participants, pending_participants) =
532 self.user_store.update(cx, move |user_store, cx| {
533 (
534 user_store.get_users(remote_participant_user_ids, cx),
535 user_store.get_users(pending_participant_user_ids, cx),
536 )
537 });
538
539 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
540 let (remote_participants, pending_participants) =
541 futures::join!(remote_participants, pending_participants);
542
543 this.update(&mut cx, |this, cx| {
544 this.participant_user_ids.clear();
545
546 if let Some(participant) = local_participant {
547 this.local_participant.projects = participant.projects;
548 } else {
549 this.local_participant.projects.clear();
550 }
551
552 if let Some(participants) = remote_participants.log_err() {
553 for (participant, user) in room.participants.into_iter().zip(participants) {
554 let Some(peer_id) = participant.peer_id else { continue };
555 this.participant_user_ids.insert(participant.user_id);
556
557 let old_projects = this
558 .remote_participants
559 .get(&participant.user_id)
560 .into_iter()
561 .flat_map(|existing| &existing.projects)
562 .map(|project| project.id)
563 .collect::<HashSet<_>>();
564 let new_projects = participant
565 .projects
566 .iter()
567 .map(|project| project.id)
568 .collect::<HashSet<_>>();
569
570 for project in &participant.projects {
571 if !old_projects.contains(&project.id) {
572 cx.emit(Event::RemoteProjectShared {
573 owner: user.clone(),
574 project_id: project.id,
575 worktree_root_names: project.worktree_root_names.clone(),
576 });
577 }
578 }
579
580 for unshared_project_id in old_projects.difference(&new_projects) {
581 this.joined_projects.retain(|project| {
582 if let Some(project) = project.upgrade(cx) {
583 project.update(cx, |project, cx| {
584 if project.remote_id() == Some(*unshared_project_id) {
585 project.disconnected_from_host(cx);
586 false
587 } else {
588 true
589 }
590 })
591 } else {
592 false
593 }
594 });
595 cx.emit(Event::RemoteProjectUnshared {
596 project_id: *unshared_project_id,
597 });
598 }
599
600 let location = ParticipantLocation::from_proto(participant.location)
601 .unwrap_or(ParticipantLocation::External);
602 if let Some(remote_participant) =
603 this.remote_participants.get_mut(&participant.user_id)
604 {
605 remote_participant.projects = participant.projects;
606 remote_participant.peer_id = peer_id;
607 if location != remote_participant.location {
608 remote_participant.location = location;
609 cx.emit(Event::ParticipantLocationChanged {
610 participant_id: peer_id,
611 });
612 }
613 } else {
614 this.remote_participants.insert(
615 participant.user_id,
616 RemoteParticipant {
617 user: user.clone(),
618 peer_id,
619 projects: participant.projects,
620 location,
621 tracks: Default::default(),
622 },
623 );
624
625 if let Some(live_kit) = this.live_kit.as_ref() {
626 let tracks =
627 live_kit.room.remote_video_tracks(&user.id.to_string());
628 for track in tracks {
629 this.remote_video_track_updated(
630 RemoteVideoTrackUpdate::Subscribed(track),
631 cx,
632 )
633 .log_err();
634 }
635 }
636 }
637 }
638
639 this.remote_participants.retain(|user_id, participant| {
640 if this.participant_user_ids.contains(user_id) {
641 true
642 } else {
643 for project in &participant.projects {
644 cx.emit(Event::RemoteProjectUnshared {
645 project_id: project.id,
646 });
647 }
648 false
649 }
650 });
651 }
652
653 if let Some(pending_participants) = pending_participants.log_err() {
654 this.pending_participants = pending_participants;
655 for participant in &this.pending_participants {
656 this.participant_user_ids.insert(participant.id);
657 }
658 }
659
660 this.follows_by_leader_id_project_id.clear();
661 for follower in room.followers {
662 let project_id = follower.project_id;
663 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
664 (Some(leader), Some(follower)) => (leader, follower),
665
666 _ => {
667 log::error!("Follower message {follower:?} missing some state");
668 continue;
669 }
670 };
671
672 let list = this
673 .follows_by_leader_id_project_id
674 .entry((leader, project_id))
675 .or_insert(Vec::new());
676 if !list.contains(&follower) {
677 list.push(follower);
678 }
679 }
680
681 this.pending_room_update.take();
682 if this.should_leave() {
683 log::info!("room is empty, leaving");
684 let _ = this.leave(cx);
685 }
686
687 this.check_invariants();
688 cx.notify();
689 });
690 }));
691
692 cx.notify();
693 Ok(())
694 }
695
696 fn remote_video_track_updated(
697 &mut self,
698 change: RemoteVideoTrackUpdate,
699 cx: &mut ModelContext<Self>,
700 ) -> Result<()> {
701 match change {
702 RemoteVideoTrackUpdate::Subscribed(track) => {
703 let user_id = track.publisher_id().parse()?;
704 let track_id = track.sid().to_string();
705 let participant = self
706 .remote_participants
707 .get_mut(&user_id)
708 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
709 participant.tracks.insert(
710 track_id.clone(),
711 Arc::new(RemoteVideoTrack {
712 live_kit_track: track,
713 }),
714 );
715 cx.emit(Event::RemoteVideoTracksChanged {
716 participant_id: participant.peer_id,
717 });
718 }
719 RemoteVideoTrackUpdate::Unsubscribed {
720 publisher_id,
721 track_id,
722 } => {
723 let user_id = publisher_id.parse()?;
724 let participant = self
725 .remote_participants
726 .get_mut(&user_id)
727 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
728 participant.tracks.remove(&track_id);
729 cx.emit(Event::RemoteVideoTracksChanged {
730 participant_id: participant.peer_id,
731 });
732 }
733 }
734
735 cx.notify();
736 Ok(())
737 }
738
739 fn check_invariants(&self) {
740 #[cfg(any(test, feature = "test-support"))]
741 {
742 for participant in self.remote_participants.values() {
743 assert!(self.participant_user_ids.contains(&participant.user.id));
744 assert_ne!(participant.user.id, self.client.user_id().unwrap());
745 }
746
747 for participant in &self.pending_participants {
748 assert!(self.participant_user_ids.contains(&participant.id));
749 assert_ne!(participant.id, self.client.user_id().unwrap());
750 }
751
752 assert_eq!(
753 self.participant_user_ids.len(),
754 self.remote_participants.len() + self.pending_participants.len()
755 );
756 }
757 }
758
759 pub(crate) fn call(
760 &mut self,
761 called_user_id: u64,
762 initial_project_id: Option<u64>,
763 cx: &mut ModelContext<Self>,
764 ) -> Task<Result<()>> {
765 if self.status.is_offline() {
766 return Task::ready(Err(anyhow!("room is offline")));
767 }
768
769 cx.notify();
770 let client = self.client.clone();
771 let room_id = self.id;
772 self.pending_call_count += 1;
773 cx.spawn(|this, mut cx| async move {
774 let result = client
775 .request(proto::Call {
776 room_id,
777 called_user_id,
778 initial_project_id,
779 })
780 .await;
781 this.update(&mut cx, |this, cx| {
782 this.pending_call_count -= 1;
783 if this.should_leave() {
784 this.leave(cx).detach_and_log_err(cx);
785 }
786 });
787 result?;
788 Ok(())
789 })
790 }
791
792 pub fn join_project(
793 &mut self,
794 id: u64,
795 language_registry: Arc<LanguageRegistry>,
796 fs: Arc<dyn Fs>,
797 cx: &mut ModelContext<Self>,
798 ) -> Task<Result<ModelHandle<Project>>> {
799 let client = self.client.clone();
800 let user_store = self.user_store.clone();
801 cx.spawn(|this, mut cx| async move {
802 let project =
803 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
804 this.update(&mut cx, |this, cx| {
805 this.joined_projects.retain(|project| {
806 if let Some(project) = project.upgrade(cx) {
807 !project.read(cx).is_read_only()
808 } else {
809 false
810 }
811 });
812 this.joined_projects.insert(project.downgrade());
813 });
814 Ok(project)
815 })
816 }
817
818 pub(crate) fn share_project(
819 &mut self,
820 project: ModelHandle<Project>,
821 cx: &mut ModelContext<Self>,
822 ) -> Task<Result<u64>> {
823 if let Some(project_id) = project.read(cx).remote_id() {
824 return Task::ready(Ok(project_id));
825 }
826
827 let request = self.client.request(proto::ShareProject {
828 room_id: self.id(),
829 worktrees: project.read(cx).worktree_metadata_protos(cx),
830 });
831 cx.spawn(|this, mut cx| async move {
832 let response = request.await?;
833
834 project.update(&mut cx, |project, cx| {
835 project.shared(response.project_id, cx)
836 })?;
837
838 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
839 this.update(&mut cx, |this, cx| {
840 this.shared_projects.insert(project.downgrade());
841 let active_project = this.local_participant.active_project.as_ref();
842 if active_project.map_or(false, |location| *location == project) {
843 this.set_location(Some(&project), cx)
844 } else {
845 Task::ready(Ok(()))
846 }
847 })
848 .await?;
849
850 Ok(response.project_id)
851 })
852 }
853
854 pub(crate) fn unshare_project(
855 &mut self,
856 project: ModelHandle<Project>,
857 cx: &mut ModelContext<Self>,
858 ) -> Result<()> {
859 let project_id = match project.read(cx).remote_id() {
860 Some(project_id) => project_id,
861 None => return Ok(()),
862 };
863
864 self.client.send(proto::UnshareProject { project_id })?;
865 project.update(cx, |this, cx| this.unshare(cx))
866 }
867
868 pub(crate) fn set_location(
869 &mut self,
870 project: Option<&ModelHandle<Project>>,
871 cx: &mut ModelContext<Self>,
872 ) -> Task<Result<()>> {
873 if self.status.is_offline() {
874 return Task::ready(Err(anyhow!("room is offline")));
875 }
876
877 let client = self.client.clone();
878 let room_id = self.id;
879 let location = if let Some(project) = project {
880 self.local_participant.active_project = Some(project.downgrade());
881 if let Some(project_id) = project.read(cx).remote_id() {
882 proto::participant_location::Variant::SharedProject(
883 proto::participant_location::SharedProject { id: project_id },
884 )
885 } else {
886 proto::participant_location::Variant::UnsharedProject(
887 proto::participant_location::UnsharedProject {},
888 )
889 }
890 } else {
891 self.local_participant.active_project = None;
892 proto::participant_location::Variant::External(proto::participant_location::External {})
893 };
894
895 cx.notify();
896 cx.foreground().spawn(async move {
897 client
898 .request(proto::UpdateParticipantLocation {
899 room_id,
900 location: Some(proto::ParticipantLocation {
901 variant: Some(location),
902 }),
903 })
904 .await?;
905 Ok(())
906 })
907 }
908
909 pub fn is_screen_sharing(&self) -> bool {
910 self.live_kit.as_ref().map_or(false, |live_kit| {
911 !matches!(live_kit.screen_track, ScreenTrack::None)
912 })
913 }
914
915 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
916 if self.status.is_offline() {
917 return Task::ready(Err(anyhow!("room is offline")));
918 } else if self.is_screen_sharing() {
919 return Task::ready(Err(anyhow!("screen was already shared")));
920 }
921
922 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
923 let publish_id = post_inc(&mut live_kit.next_publish_id);
924 live_kit.screen_track = ScreenTrack::Pending { publish_id };
925 cx.notify();
926 (live_kit.room.display_sources(), publish_id)
927 } else {
928 return Task::ready(Err(anyhow!("live-kit was not initialized")));
929 };
930
931 cx.spawn_weak(|this, mut cx| async move {
932 let publish_track = async {
933 let displays = displays.await?;
934 let display = displays
935 .first()
936 .ok_or_else(|| anyhow!("no display found"))?;
937 let track = LocalVideoTrack::screen_share_for_display(&display);
938 this.upgrade(&cx)
939 .ok_or_else(|| anyhow!("room was dropped"))?
940 .read_with(&cx, |this, _| {
941 this.live_kit
942 .as_ref()
943 .map(|live_kit| live_kit.room.publish_video_track(&track))
944 })
945 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
946 .await
947 };
948
949 let publication = publish_track.await;
950 this.upgrade(&cx)
951 .ok_or_else(|| anyhow!("room was dropped"))?
952 .update(&mut cx, |this, cx| {
953 let live_kit = this
954 .live_kit
955 .as_mut()
956 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
957
958 let canceled = if let ScreenTrack::Pending {
959 publish_id: cur_publish_id,
960 } = &live_kit.screen_track
961 {
962 *cur_publish_id != publish_id
963 } else {
964 true
965 };
966
967 match publication {
968 Ok(publication) => {
969 if canceled {
970 live_kit.room.unpublish_track(publication);
971 } else {
972 live_kit.screen_track = ScreenTrack::Published(publication);
973 cx.notify();
974 }
975 Ok(())
976 }
977 Err(error) => {
978 if canceled {
979 Ok(())
980 } else {
981 live_kit.screen_track = ScreenTrack::None;
982 cx.notify();
983 Err(error)
984 }
985 }
986 }
987 })
988 })
989 }
990
991 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
992 if self.status.is_offline() {
993 return Err(anyhow!("room is offline"));
994 }
995
996 let live_kit = self
997 .live_kit
998 .as_mut()
999 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1000 match mem::take(&mut live_kit.screen_track) {
1001 ScreenTrack::None => Err(anyhow!("screen was not shared")),
1002 ScreenTrack::Pending { .. } => {
1003 cx.notify();
1004 Ok(())
1005 }
1006 ScreenTrack::Published(track) => {
1007 live_kit.room.unpublish_track(track);
1008 cx.notify();
1009 Ok(())
1010 }
1011 }
1012 }
1013
1014 #[cfg(any(test, feature = "test-support"))]
1015 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1016 self.live_kit
1017 .as_ref()
1018 .unwrap()
1019 .room
1020 .set_display_sources(sources);
1021 }
1022}
1023
1024struct LiveKitRoom {
1025 room: Arc<live_kit_client::Room>,
1026 screen_track: ScreenTrack,
1027 next_publish_id: usize,
1028 _maintain_room: Task<()>,
1029 _maintain_tracks: Task<()>,
1030}
1031
1032enum ScreenTrack {
1033 None,
1034 Pending { publish_id: usize },
1035 Published(LocalTrackPublication),
1036}
1037
1038impl Default for ScreenTrack {
1039 fn default() -> Self {
1040 Self::None
1041 }
1042}
1043
1044#[derive(Copy, Clone, PartialEq, Eq)]
1045pub enum RoomStatus {
1046 Online,
1047 Rejoining,
1048 Offline,
1049}
1050
1051impl RoomStatus {
1052 pub fn is_offline(&self) -> bool {
1053 matches!(self, RoomStatus::Offline)
1054 }
1055
1056 pub fn is_online(&self) -> bool {
1057 matches!(self, RoomStatus::Online)
1058 }
1059}