1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{
7 proto::{self, PeerId},
8 Client, TypedEnvelope, User, UserStore,
9};
10use collections::{BTreeMap, HashMap, HashSet};
11use fs::Fs;
12use futures::{FutureExt, StreamExt};
13use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
14use language::LanguageRegistry;
15use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
16use postage::stream::Stream;
17use project::Project;
18use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
19use util::{post_inc, ResultExt, TryFutureExt};
20
21pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24pub enum Event {
25 ParticipantLocationChanged {
26 participant_id: proto::PeerId,
27 },
28 RemoteVideoTracksChanged {
29 participant_id: proto::PeerId,
30 },
31 RemoteProjectShared {
32 owner: Arc<User>,
33 project_id: u64,
34 worktree_root_names: Vec<String>,
35 },
36 RemoteProjectUnshared {
37 project_id: u64,
38 },
39 Left,
40}
41
42pub struct Room {
43 id: u64,
44 live_kit: Option<LiveKitRoom>,
45 status: RoomStatus,
46 shared_projects: HashSet<WeakModelHandle<Project>>,
47 joined_projects: HashSet<WeakModelHandle<Project>>,
48 local_participant: LocalParticipant,
49 remote_participants: BTreeMap<u64, RemoteParticipant>,
50 pending_participants: Vec<Arc<User>>,
51 participant_user_ids: HashSet<u64>,
52 pending_call_count: usize,
53 leave_when_empty: bool,
54 client: Arc<Client>,
55 user_store: ModelHandle<UserStore>,
56 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
57 subscriptions: Vec<client::Subscription>,
58 pending_room_update: Option<Task<()>>,
59 maintain_connection: Option<Task<Option<()>>>,
60}
61
62impl Entity for Room {
63 type Event = Event;
64
65 fn release(&mut self, cx: &mut AppContext) {
66 if self.status.is_online() {
67 self.leave_internal(cx).detach_and_log_err(cx);
68 }
69 }
70
71 fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
72 if self.status.is_online() {
73 let leave = self.leave_internal(cx);
74 Some(
75 cx.background()
76 .spawn(async move {
77 leave.await.log_err();
78 })
79 .boxed(),
80 )
81 } else {
82 None
83 }
84 }
85}
86
87impl Room {
88 fn new(
89 id: u64,
90 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
91 client: Arc<Client>,
92 user_store: ModelHandle<UserStore>,
93 cx: &mut ModelContext<Self>,
94 ) -> Self {
95 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
96 let room = live_kit_client::Room::new();
97 let mut status = room.status();
98 // Consume the initial status of the room.
99 let _ = status.try_recv();
100 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
101 while let Some(status) = status.next().await {
102 let this = if let Some(this) = this.upgrade(&cx) {
103 this
104 } else {
105 break;
106 };
107
108 if status == live_kit_client::ConnectionState::Disconnected {
109 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
110 break;
111 }
112 }
113 });
114
115 let mut track_changes = room.remote_video_track_updates();
116 let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
117 while let Some(track_change) = track_changes.next().await {
118 let this = if let Some(this) = this.upgrade(&cx) {
119 this
120 } else {
121 break;
122 };
123
124 this.update(&mut cx, |this, cx| {
125 this.remote_video_track_updated(track_change, cx).log_err()
126 });
127 }
128 });
129
130 cx.foreground()
131 .spawn(room.connect(&connection_info.server_url, &connection_info.token))
132 .detach_and_log_err(cx);
133
134 Some(LiveKitRoom {
135 room,
136 screen_track: ScreenTrack::None,
137 next_publish_id: 0,
138 _maintain_room,
139 _maintain_tracks,
140 })
141 } else {
142 None
143 };
144
145 let maintain_connection =
146 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
147
148 Self {
149 id,
150 live_kit: live_kit_room,
151 status: RoomStatus::Online,
152 shared_projects: Default::default(),
153 joined_projects: Default::default(),
154 participant_user_ids: Default::default(),
155 local_participant: Default::default(),
156 remote_participants: Default::default(),
157 pending_participants: Default::default(),
158 pending_call_count: 0,
159 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
160 leave_when_empty: false,
161 pending_room_update: None,
162 client,
163 user_store,
164 follows_by_leader_id_project_id: Default::default(),
165 maintain_connection: Some(maintain_connection),
166 }
167 }
168
169 pub(crate) fn create(
170 called_user_id: u64,
171 initial_project: Option<ModelHandle<Project>>,
172 client: Arc<Client>,
173 user_store: ModelHandle<UserStore>,
174 cx: &mut AppContext,
175 ) -> Task<Result<ModelHandle<Self>>> {
176 cx.spawn(|mut cx| async move {
177 let response = client.request(proto::CreateRoom {}).await?;
178 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
179 let room = cx.add_model(|cx| {
180 Self::new(
181 room_proto.id,
182 response.live_kit_connection_info,
183 client,
184 user_store,
185 cx,
186 )
187 });
188
189 let initial_project_id = if let Some(initial_project) = initial_project {
190 let initial_project_id = room
191 .update(&mut cx, |room, cx| {
192 room.share_project(initial_project.clone(), cx)
193 })
194 .await?;
195 Some(initial_project_id)
196 } else {
197 None
198 };
199
200 match room
201 .update(&mut cx, |room, cx| {
202 room.leave_when_empty = true;
203 room.call(called_user_id, initial_project_id, cx)
204 })
205 .await
206 {
207 Ok(()) => Ok(room),
208 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
209 }
210 })
211 }
212
213 pub(crate) fn join(
214 call: &IncomingCall,
215 client: Arc<Client>,
216 user_store: ModelHandle<UserStore>,
217 cx: &mut AppContext,
218 ) -> Task<Result<ModelHandle<Self>>> {
219 let room_id = call.room_id;
220 cx.spawn(|mut cx| async move {
221 let response = client.request(proto::JoinRoom { id: room_id }).await?;
222 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
223 let room = cx.add_model(|cx| {
224 Self::new(
225 room_id,
226 response.live_kit_connection_info,
227 client,
228 user_store,
229 cx,
230 )
231 });
232 room.update(&mut cx, |room, cx| {
233 room.leave_when_empty = true;
234 room.apply_room_update(room_proto, cx)?;
235 anyhow::Ok(())
236 })?;
237 Ok(room)
238 })
239 }
240
241 fn should_leave(&self) -> bool {
242 self.leave_when_empty
243 && self.pending_room_update.is_none()
244 && self.pending_participants.is_empty()
245 && self.remote_participants.is_empty()
246 && self.pending_call_count == 0
247 }
248
249 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
250 cx.notify();
251 cx.emit(Event::Left);
252 self.leave_internal(cx)
253 }
254
255 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
256 if self.status.is_offline() {
257 return Task::ready(Err(anyhow!("room is offline")));
258 }
259
260 log::info!("leaving room");
261
262 for project in self.shared_projects.drain() {
263 if let Some(project) = project.upgrade(cx) {
264 project.update(cx, |project, cx| {
265 project.unshare(cx).log_err();
266 });
267 }
268 }
269 for project in self.joined_projects.drain() {
270 if let Some(project) = project.upgrade(cx) {
271 project.update(cx, |project, cx| {
272 project.disconnected_from_host(cx);
273 project.close(cx);
274 });
275 }
276 }
277
278 self.status = RoomStatus::Offline;
279 self.remote_participants.clear();
280 self.pending_participants.clear();
281 self.participant_user_ids.clear();
282 self.subscriptions.clear();
283 self.live_kit.take();
284 self.pending_room_update.take();
285 self.maintain_connection.take();
286
287 let leave_room = self.client.request(proto::LeaveRoom {});
288 cx.background().spawn(async move {
289 leave_room.await?;
290 anyhow::Ok(())
291 })
292 }
293
294 async fn maintain_connection(
295 this: WeakModelHandle<Self>,
296 client: Arc<Client>,
297 mut cx: AsyncAppContext,
298 ) -> Result<()> {
299 let mut client_status = client.status();
300 loop {
301 let _ = client_status.try_recv();
302 let is_connected = client_status.borrow().is_connected();
303 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
304 if !is_connected || client_status.next().await.is_some() {
305 log::info!("detected client disconnection");
306
307 this.upgrade(&cx)
308 .ok_or_else(|| anyhow!("room was dropped"))?
309 .update(&mut cx, |this, cx| {
310 this.status = RoomStatus::Rejoining;
311 cx.notify();
312 });
313
314 // Wait for client to re-establish a connection to the server.
315 {
316 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
317 let client_reconnection = async {
318 let mut remaining_attempts = 3;
319 while remaining_attempts > 0 {
320 if client_status.borrow().is_connected() {
321 log::info!("client reconnected, attempting to rejoin room");
322
323 let Some(this) = this.upgrade(&cx) else { break };
324 if this
325 .update(&mut cx, |this, cx| this.rejoin(cx))
326 .await
327 .log_err()
328 .is_some()
329 {
330 return true;
331 } else {
332 remaining_attempts -= 1;
333 }
334 } else if client_status.borrow().is_signed_out() {
335 return false;
336 }
337
338 log::info!(
339 "waiting for client status change, remaining attempts {}",
340 remaining_attempts
341 );
342 client_status.next().await;
343 }
344 false
345 }
346 .fuse();
347 futures::pin_mut!(client_reconnection);
348
349 futures::select_biased! {
350 reconnected = client_reconnection => {
351 if reconnected {
352 log::info!("successfully reconnected to room");
353 // If we successfully joined the room, go back around the loop
354 // waiting for future connection status changes.
355 continue;
356 }
357 }
358 _ = reconnection_timeout => {
359 log::info!("room reconnection timeout expired");
360 }
361 }
362 }
363
364 break;
365 }
366 }
367
368 // The client failed to re-establish a connection to the server
369 // or an error occurred while trying to re-join the room. Either way
370 // we leave the room and return an error.
371 if let Some(this) = this.upgrade(&cx) {
372 log::info!("reconnection failed, leaving room");
373 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
374 }
375 Err(anyhow!(
376 "can't reconnect to room: client failed to re-establish connection"
377 ))
378 }
379
380 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
381 let mut projects = HashMap::default();
382 let mut reshared_projects = Vec::new();
383 let mut rejoined_projects = Vec::new();
384 self.shared_projects.retain(|project| {
385 if let Some(handle) = project.upgrade(cx) {
386 let project = handle.read(cx);
387 if let Some(project_id) = project.remote_id() {
388 projects.insert(project_id, handle.clone());
389 reshared_projects.push(proto::UpdateProject {
390 project_id,
391 worktrees: project.worktree_metadata_protos(cx),
392 });
393 return true;
394 }
395 }
396 false
397 });
398 self.joined_projects.retain(|project| {
399 if let Some(handle) = project.upgrade(cx) {
400 let project = handle.read(cx);
401 if let Some(project_id) = project.remote_id() {
402 projects.insert(project_id, handle.clone());
403 rejoined_projects.push(proto::RejoinProject {
404 id: project_id,
405 worktrees: project
406 .worktrees(cx)
407 .map(|worktree| {
408 let worktree = worktree.read(cx);
409 proto::RejoinWorktree {
410 id: worktree.id().to_proto(),
411 scan_id: worktree.completed_scan_id() as u64,
412 }
413 })
414 .collect(),
415 });
416 }
417 return true;
418 }
419 false
420 });
421
422 let response = self.client.request(proto::RejoinRoom {
423 id: self.id,
424 reshared_projects,
425 rejoined_projects,
426 });
427
428 cx.spawn(|this, mut cx| async move {
429 let response = response.await?;
430 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
431 this.update(&mut cx, |this, cx| {
432 this.status = RoomStatus::Online;
433 this.apply_room_update(room_proto, cx)?;
434
435 for reshared_project in response.reshared_projects {
436 if let Some(project) = projects.get(&reshared_project.id) {
437 project.update(cx, |project, cx| {
438 project.reshared(reshared_project, cx).log_err();
439 });
440 }
441 }
442
443 for rejoined_project in response.rejoined_projects {
444 if let Some(project) = projects.get(&rejoined_project.id) {
445 project.update(cx, |project, cx| {
446 project.rejoined(rejoined_project, cx).log_err();
447 });
448 }
449 }
450
451 anyhow::Ok(())
452 })
453 })
454 }
455
456 pub fn id(&self) -> u64 {
457 self.id
458 }
459
460 pub fn status(&self) -> RoomStatus {
461 self.status
462 }
463
464 pub fn local_participant(&self) -> &LocalParticipant {
465 &self.local_participant
466 }
467
468 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
469 &self.remote_participants
470 }
471
472 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
473 self.remote_participants
474 .values()
475 .find(|p| p.peer_id == peer_id)
476 }
477
478 pub fn pending_participants(&self) -> &[Arc<User>] {
479 &self.pending_participants
480 }
481
482 pub fn contains_participant(&self, user_id: u64) -> bool {
483 self.participant_user_ids.contains(&user_id)
484 }
485
486 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
487 self.follows_by_leader_id_project_id
488 .get(&(leader_id, project_id))
489 .map_or(&[], |v| v.as_slice())
490 }
491
492 async fn handle_room_updated(
493 this: ModelHandle<Self>,
494 envelope: TypedEnvelope<proto::RoomUpdated>,
495 _: Arc<Client>,
496 mut cx: AsyncAppContext,
497 ) -> Result<()> {
498 let room = envelope
499 .payload
500 .room
501 .ok_or_else(|| anyhow!("invalid room"))?;
502 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
503 }
504
505 fn apply_room_update(
506 &mut self,
507 mut room: proto::Room,
508 cx: &mut ModelContext<Self>,
509 ) -> Result<()> {
510 // Filter ourselves out from the room's participants.
511 let local_participant_ix = room
512 .participants
513 .iter()
514 .position(|participant| Some(participant.user_id) == self.client.user_id());
515 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
516
517 let pending_participant_user_ids = room
518 .pending_participants
519 .iter()
520 .map(|p| p.user_id)
521 .collect::<Vec<_>>();
522
523 let remote_participant_user_ids = room
524 .participants
525 .iter()
526 .map(|p| p.user_id)
527 .collect::<Vec<_>>();
528
529 let (remote_participants, pending_participants) =
530 self.user_store.update(cx, move |user_store, cx| {
531 (
532 user_store.get_users(remote_participant_user_ids, cx),
533 user_store.get_users(pending_participant_user_ids, cx),
534 )
535 });
536
537 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
538 let (remote_participants, pending_participants) =
539 futures::join!(remote_participants, pending_participants);
540
541 this.update(&mut cx, |this, cx| {
542 this.participant_user_ids.clear();
543
544 if let Some(participant) = local_participant {
545 this.local_participant.projects = participant.projects;
546 } else {
547 this.local_participant.projects.clear();
548 }
549
550 if let Some(participants) = remote_participants.log_err() {
551 for (participant, user) in room.participants.into_iter().zip(participants) {
552 let Some(peer_id) = participant.peer_id else { continue };
553 this.participant_user_ids.insert(participant.user_id);
554
555 let old_projects = this
556 .remote_participants
557 .get(&participant.user_id)
558 .into_iter()
559 .flat_map(|existing| &existing.projects)
560 .map(|project| project.id)
561 .collect::<HashSet<_>>();
562 let new_projects = participant
563 .projects
564 .iter()
565 .map(|project| project.id)
566 .collect::<HashSet<_>>();
567
568 for project in &participant.projects {
569 if !old_projects.contains(&project.id) {
570 cx.emit(Event::RemoteProjectShared {
571 owner: user.clone(),
572 project_id: project.id,
573 worktree_root_names: project.worktree_root_names.clone(),
574 });
575 }
576 }
577
578 for unshared_project_id in old_projects.difference(&new_projects) {
579 this.joined_projects.retain(|project| {
580 if let Some(project) = project.upgrade(cx) {
581 project.update(cx, |project, cx| {
582 if project.remote_id() == Some(*unshared_project_id) {
583 project.disconnected_from_host(cx);
584 false
585 } else {
586 true
587 }
588 })
589 } else {
590 false
591 }
592 });
593 cx.emit(Event::RemoteProjectUnshared {
594 project_id: *unshared_project_id,
595 });
596 }
597
598 let location = ParticipantLocation::from_proto(participant.location)
599 .unwrap_or(ParticipantLocation::External);
600 if let Some(remote_participant) =
601 this.remote_participants.get_mut(&participant.user_id)
602 {
603 remote_participant.projects = participant.projects;
604 remote_participant.peer_id = peer_id;
605 if location != remote_participant.location {
606 remote_participant.location = location;
607 cx.emit(Event::ParticipantLocationChanged {
608 participant_id: peer_id,
609 });
610 }
611 } else {
612 this.remote_participants.insert(
613 participant.user_id,
614 RemoteParticipant {
615 user: user.clone(),
616 peer_id,
617 projects: participant.projects,
618 location,
619 tracks: Default::default(),
620 },
621 );
622
623 if let Some(live_kit) = this.live_kit.as_ref() {
624 let tracks =
625 live_kit.room.remote_video_tracks(&user.id.to_string());
626 for track in tracks {
627 this.remote_video_track_updated(
628 RemoteVideoTrackUpdate::Subscribed(track),
629 cx,
630 )
631 .log_err();
632 }
633 }
634 }
635 }
636
637 this.remote_participants.retain(|user_id, participant| {
638 if this.participant_user_ids.contains(user_id) {
639 true
640 } else {
641 for project in &participant.projects {
642 cx.emit(Event::RemoteProjectUnshared {
643 project_id: project.id,
644 });
645 }
646 false
647 }
648 });
649 }
650
651 if let Some(pending_participants) = pending_participants.log_err() {
652 this.pending_participants = pending_participants;
653 for participant in &this.pending_participants {
654 this.participant_user_ids.insert(participant.id);
655 }
656 }
657
658 this.follows_by_leader_id_project_id.clear();
659 for follower in room.followers {
660 let project_id = follower.project_id;
661 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
662 (Some(leader), Some(follower)) => (leader, follower),
663
664 _ => {
665 log::error!("Follower message {follower:?} missing some state");
666 continue;
667 }
668 };
669
670 let list = this
671 .follows_by_leader_id_project_id
672 .entry((leader, project_id))
673 .or_insert(Vec::new());
674 if !list.contains(&follower) {
675 list.push(follower);
676 }
677 }
678
679 this.pending_room_update.take();
680 if this.should_leave() {
681 log::info!("room is empty, leaving");
682 let _ = this.leave(cx);
683 }
684
685 this.check_invariants();
686 cx.notify();
687 });
688 }));
689
690 cx.notify();
691 Ok(())
692 }
693
694 fn remote_video_track_updated(
695 &mut self,
696 change: RemoteVideoTrackUpdate,
697 cx: &mut ModelContext<Self>,
698 ) -> Result<()> {
699 match change {
700 RemoteVideoTrackUpdate::Subscribed(track) => {
701 let user_id = track.publisher_id().parse()?;
702 let track_id = track.sid().to_string();
703 let participant = self
704 .remote_participants
705 .get_mut(&user_id)
706 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
707 participant.tracks.insert(
708 track_id.clone(),
709 Arc::new(RemoteVideoTrack {
710 live_kit_track: track,
711 }),
712 );
713 cx.emit(Event::RemoteVideoTracksChanged {
714 participant_id: participant.peer_id,
715 });
716 }
717 RemoteVideoTrackUpdate::Unsubscribed {
718 publisher_id,
719 track_id,
720 } => {
721 let user_id = publisher_id.parse()?;
722 let participant = self
723 .remote_participants
724 .get_mut(&user_id)
725 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
726 participant.tracks.remove(&track_id);
727 cx.emit(Event::RemoteVideoTracksChanged {
728 participant_id: participant.peer_id,
729 });
730 }
731 }
732
733 cx.notify();
734 Ok(())
735 }
736
737 fn check_invariants(&self) {
738 #[cfg(any(test, feature = "test-support"))]
739 {
740 for participant in self.remote_participants.values() {
741 assert!(self.participant_user_ids.contains(&participant.user.id));
742 assert_ne!(participant.user.id, self.client.user_id().unwrap());
743 }
744
745 for participant in &self.pending_participants {
746 assert!(self.participant_user_ids.contains(&participant.id));
747 assert_ne!(participant.id, self.client.user_id().unwrap());
748 }
749
750 assert_eq!(
751 self.participant_user_ids.len(),
752 self.remote_participants.len() + self.pending_participants.len()
753 );
754 }
755 }
756
757 pub(crate) fn call(
758 &mut self,
759 called_user_id: u64,
760 initial_project_id: Option<u64>,
761 cx: &mut ModelContext<Self>,
762 ) -> Task<Result<()>> {
763 if self.status.is_offline() {
764 return Task::ready(Err(anyhow!("room is offline")));
765 }
766
767 cx.notify();
768 let client = self.client.clone();
769 let room_id = self.id;
770 self.pending_call_count += 1;
771 cx.spawn(|this, mut cx| async move {
772 let result = client
773 .request(proto::Call {
774 room_id,
775 called_user_id,
776 initial_project_id,
777 })
778 .await;
779 this.update(&mut cx, |this, cx| {
780 this.pending_call_count -= 1;
781 if this.should_leave() {
782 this.leave(cx).detach_and_log_err(cx);
783 }
784 });
785 result?;
786 Ok(())
787 })
788 }
789
790 pub fn join_project(
791 &mut self,
792 id: u64,
793 language_registry: Arc<LanguageRegistry>,
794 fs: Arc<dyn Fs>,
795 cx: &mut ModelContext<Self>,
796 ) -> Task<Result<ModelHandle<Project>>> {
797 let client = self.client.clone();
798 let user_store = self.user_store.clone();
799 cx.spawn(|this, mut cx| async move {
800 let project =
801 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
802 this.update(&mut cx, |this, cx| {
803 this.joined_projects.retain(|project| {
804 if let Some(project) = project.upgrade(cx) {
805 !project.read(cx).is_read_only()
806 } else {
807 false
808 }
809 });
810 this.joined_projects.insert(project.downgrade());
811 });
812 Ok(project)
813 })
814 }
815
816 pub(crate) fn share_project(
817 &mut self,
818 project: ModelHandle<Project>,
819 cx: &mut ModelContext<Self>,
820 ) -> Task<Result<u64>> {
821 if let Some(project_id) = project.read(cx).remote_id() {
822 return Task::ready(Ok(project_id));
823 }
824
825 let request = self.client.request(proto::ShareProject {
826 room_id: self.id(),
827 worktrees: project.read(cx).worktree_metadata_protos(cx),
828 });
829 cx.spawn(|this, mut cx| async move {
830 let response = request.await?;
831
832 project.update(&mut cx, |project, cx| {
833 project.shared(response.project_id, cx)
834 })?;
835
836 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
837 this.update(&mut cx, |this, cx| {
838 this.shared_projects.insert(project.downgrade());
839 let active_project = this.local_participant.active_project.as_ref();
840 if active_project.map_or(false, |location| *location == project) {
841 this.set_location(Some(&project), cx)
842 } else {
843 Task::ready(Ok(()))
844 }
845 })
846 .await?;
847
848 Ok(response.project_id)
849 })
850 }
851
852 pub(crate) fn unshare_project(
853 &mut self,
854 project: ModelHandle<Project>,
855 cx: &mut ModelContext<Self>,
856 ) -> Result<()> {
857 let project_id = match project.read(cx).remote_id() {
858 Some(project_id) => project_id,
859 None => return Ok(()),
860 };
861
862 self.client.send(proto::UnshareProject { project_id })?;
863 project.update(cx, |this, cx| this.unshare(cx))
864 }
865
866 pub(crate) fn set_location(
867 &mut self,
868 project: Option<&ModelHandle<Project>>,
869 cx: &mut ModelContext<Self>,
870 ) -> Task<Result<()>> {
871 if self.status.is_offline() {
872 return Task::ready(Err(anyhow!("room is offline")));
873 }
874
875 let client = self.client.clone();
876 let room_id = self.id;
877 let location = if let Some(project) = project {
878 self.local_participant.active_project = Some(project.downgrade());
879 if let Some(project_id) = project.read(cx).remote_id() {
880 proto::participant_location::Variant::SharedProject(
881 proto::participant_location::SharedProject { id: project_id },
882 )
883 } else {
884 proto::participant_location::Variant::UnsharedProject(
885 proto::participant_location::UnsharedProject {},
886 )
887 }
888 } else {
889 self.local_participant.active_project = None;
890 proto::participant_location::Variant::External(proto::participant_location::External {})
891 };
892
893 cx.notify();
894 cx.foreground().spawn(async move {
895 client
896 .request(proto::UpdateParticipantLocation {
897 room_id,
898 location: Some(proto::ParticipantLocation {
899 variant: Some(location),
900 }),
901 })
902 .await?;
903 Ok(())
904 })
905 }
906
907 pub fn is_screen_sharing(&self) -> bool {
908 self.live_kit.as_ref().map_or(false, |live_kit| {
909 !matches!(live_kit.screen_track, ScreenTrack::None)
910 })
911 }
912
913 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
914 if self.status.is_offline() {
915 return Task::ready(Err(anyhow!("room is offline")));
916 } else if self.is_screen_sharing() {
917 return Task::ready(Err(anyhow!("screen was already shared")));
918 }
919
920 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
921 let publish_id = post_inc(&mut live_kit.next_publish_id);
922 live_kit.screen_track = ScreenTrack::Pending { publish_id };
923 cx.notify();
924 (live_kit.room.display_sources(), publish_id)
925 } else {
926 return Task::ready(Err(anyhow!("live-kit was not initialized")));
927 };
928
929 cx.spawn_weak(|this, mut cx| async move {
930 let publish_track = async {
931 let displays = displays.await?;
932 let display = displays
933 .first()
934 .ok_or_else(|| anyhow!("no display found"))?;
935 let track = LocalVideoTrack::screen_share_for_display(&display);
936 this.upgrade(&cx)
937 .ok_or_else(|| anyhow!("room was dropped"))?
938 .read_with(&cx, |this, _| {
939 this.live_kit
940 .as_ref()
941 .map(|live_kit| live_kit.room.publish_video_track(&track))
942 })
943 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
944 .await
945 };
946
947 let publication = publish_track.await;
948 this.upgrade(&cx)
949 .ok_or_else(|| anyhow!("room was dropped"))?
950 .update(&mut cx, |this, cx| {
951 let live_kit = this
952 .live_kit
953 .as_mut()
954 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
955
956 let canceled = if let ScreenTrack::Pending {
957 publish_id: cur_publish_id,
958 } = &live_kit.screen_track
959 {
960 *cur_publish_id != publish_id
961 } else {
962 true
963 };
964
965 match publication {
966 Ok(publication) => {
967 if canceled {
968 live_kit.room.unpublish_track(publication);
969 } else {
970 live_kit.screen_track = ScreenTrack::Published(publication);
971 cx.notify();
972 }
973 Ok(())
974 }
975 Err(error) => {
976 if canceled {
977 Ok(())
978 } else {
979 live_kit.screen_track = ScreenTrack::None;
980 cx.notify();
981 Err(error)
982 }
983 }
984 }
985 })
986 })
987 }
988
989 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
990 if self.status.is_offline() {
991 return Err(anyhow!("room is offline"));
992 }
993
994 let live_kit = self
995 .live_kit
996 .as_mut()
997 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
998 match mem::take(&mut live_kit.screen_track) {
999 ScreenTrack::None => Err(anyhow!("screen was not shared")),
1000 ScreenTrack::Pending { .. } => {
1001 cx.notify();
1002 Ok(())
1003 }
1004 ScreenTrack::Published(track) => {
1005 live_kit.room.unpublish_track(track);
1006 cx.notify();
1007 Ok(())
1008 }
1009 }
1010 }
1011
1012 #[cfg(any(test, feature = "test-support"))]
1013 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1014 self.live_kit
1015 .as_ref()
1016 .unwrap()
1017 .room
1018 .set_display_sources(sources);
1019 }
1020}
1021
1022struct LiveKitRoom {
1023 room: Arc<live_kit_client::Room>,
1024 screen_track: ScreenTrack,
1025 next_publish_id: usize,
1026 _maintain_room: Task<()>,
1027 _maintain_tracks: Task<()>,
1028}
1029
1030enum ScreenTrack {
1031 None,
1032 Pending { publish_id: usize },
1033 Published(LocalTrackPublication),
1034}
1035
1036impl Default for ScreenTrack {
1037 fn default() -> Self {
1038 Self::None
1039 }
1040}
1041
1042#[derive(Copy, Clone, PartialEq, Eq)]
1043pub enum RoomStatus {
1044 Online,
1045 Rejoining,
1046 Offline,
1047}
1048
1049impl RoomStatus {
1050 pub fn is_offline(&self) -> bool {
1051 matches!(self, RoomStatus::Offline)
1052 }
1053
1054 pub fn is_online(&self) -> bool {
1055 matches!(self, RoomStatus::Online)
1056 }
1057}