1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{
7 proto::{self, PeerId},
8 Client, TypedEnvelope, User, UserStore,
9};
10use collections::{BTreeMap, HashMap, HashSet};
11use fs::Fs;
12use futures::{FutureExt, StreamExt};
13use gpui::{
14 AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
15};
16use language::LanguageRegistry;
17use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
18use postage::stream::Stream;
19use project::Project;
20use std::{mem, sync::Arc, time::Duration};
21use util::{post_inc, ResultExt, TryFutureExt};
22
23pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub enum Event {
27 ParticipantLocationChanged {
28 participant_id: proto::PeerId,
29 },
30 RemoteVideoTracksChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteProjectShared {
34 owner: Arc<User>,
35 project_id: u64,
36 worktree_root_names: Vec<String>,
37 },
38 RemoteProjectUnshared {
39 project_id: u64,
40 },
41 Left,
42}
43
44pub struct Room {
45 id: u64,
46 live_kit: Option<LiveKitRoom>,
47 status: RoomStatus,
48 shared_projects: HashSet<WeakModelHandle<Project>>,
49 joined_projects: HashSet<WeakModelHandle<Project>>,
50 local_participant: LocalParticipant,
51 remote_participants: BTreeMap<u64, RemoteParticipant>,
52 pending_participants: Vec<Arc<User>>,
53 participant_user_ids: HashSet<u64>,
54 pending_call_count: usize,
55 leave_when_empty: bool,
56 client: Arc<Client>,
57 user_store: ModelHandle<UserStore>,
58 follows_by_leader_id: HashMap<PeerId, Vec<PeerId>>,
59 subscriptions: Vec<client::Subscription>,
60 pending_room_update: Option<Task<()>>,
61 maintain_connection: Option<Task<Option<()>>>,
62}
63
64impl Entity for Room {
65 type Event = Event;
66
67 fn release(&mut self, _: &mut MutableAppContext) {
68 if self.status.is_online() {
69 log::info!("room was released, sending leave message");
70 let _ = self.client.send(proto::LeaveRoom {});
71 }
72 }
73}
74
75impl Room {
76 fn new(
77 id: u64,
78 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
79 client: Arc<Client>,
80 user_store: ModelHandle<UserStore>,
81 cx: &mut ModelContext<Self>,
82 ) -> Self {
83 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
84 let room = live_kit_client::Room::new();
85 let mut status = room.status();
86 // Consume the initial status of the room.
87 let _ = status.try_recv();
88 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
89 while let Some(status) = status.next().await {
90 let this = if let Some(this) = this.upgrade(&cx) {
91 this
92 } else {
93 break;
94 };
95
96 if status == live_kit_client::ConnectionState::Disconnected {
97 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
98 break;
99 }
100 }
101 });
102
103 let mut track_changes = room.remote_video_track_updates();
104 let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
105 while let Some(track_change) = track_changes.next().await {
106 let this = if let Some(this) = this.upgrade(&cx) {
107 this
108 } else {
109 break;
110 };
111
112 this.update(&mut cx, |this, cx| {
113 this.remote_video_track_updated(track_change, cx).log_err()
114 });
115 }
116 });
117
118 cx.foreground()
119 .spawn(room.connect(&connection_info.server_url, &connection_info.token))
120 .detach_and_log_err(cx);
121
122 Some(LiveKitRoom {
123 room,
124 screen_track: ScreenTrack::None,
125 next_publish_id: 0,
126 _maintain_room,
127 _maintain_tracks,
128 })
129 } else {
130 None
131 };
132
133 let maintain_connection =
134 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
135
136 Self {
137 id,
138 live_kit: live_kit_room,
139 status: RoomStatus::Online,
140 shared_projects: Default::default(),
141 joined_projects: Default::default(),
142 participant_user_ids: Default::default(),
143 local_participant: Default::default(),
144 remote_participants: Default::default(),
145 pending_participants: Default::default(),
146 pending_call_count: 0,
147 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
148 leave_when_empty: false,
149 pending_room_update: None,
150 client,
151 user_store,
152 follows_by_leader_id: Default::default(),
153 maintain_connection: Some(maintain_connection),
154 }
155 }
156
157 pub(crate) fn create(
158 called_user_id: u64,
159 initial_project: Option<ModelHandle<Project>>,
160 client: Arc<Client>,
161 user_store: ModelHandle<UserStore>,
162 cx: &mut MutableAppContext,
163 ) -> Task<Result<ModelHandle<Self>>> {
164 cx.spawn(|mut cx| async move {
165 let response = client.request(proto::CreateRoom {}).await?;
166 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
167 let room = cx.add_model(|cx| {
168 Self::new(
169 room_proto.id,
170 response.live_kit_connection_info,
171 client,
172 user_store,
173 cx,
174 )
175 });
176
177 let initial_project_id = if let Some(initial_project) = initial_project {
178 let initial_project_id = room
179 .update(&mut cx, |room, cx| {
180 room.share_project(initial_project.clone(), cx)
181 })
182 .await?;
183 Some(initial_project_id)
184 } else {
185 None
186 };
187
188 match room
189 .update(&mut cx, |room, cx| {
190 room.leave_when_empty = true;
191 room.call(called_user_id, initial_project_id, cx)
192 })
193 .await
194 {
195 Ok(()) => Ok(room),
196 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
197 }
198 })
199 }
200
201 pub(crate) fn join(
202 call: &IncomingCall,
203 client: Arc<Client>,
204 user_store: ModelHandle<UserStore>,
205 cx: &mut MutableAppContext,
206 ) -> Task<Result<ModelHandle<Self>>> {
207 let room_id = call.room_id;
208 cx.spawn(|mut cx| async move {
209 let response = client.request(proto::JoinRoom { id: room_id }).await?;
210 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
211 let room = cx.add_model(|cx| {
212 Self::new(
213 room_id,
214 response.live_kit_connection_info,
215 client,
216 user_store,
217 cx,
218 )
219 });
220 room.update(&mut cx, |room, cx| {
221 room.leave_when_empty = true;
222 room.apply_room_update(room_proto, cx)?;
223 anyhow::Ok(())
224 })?;
225 Ok(room)
226 })
227 }
228
229 fn should_leave(&self) -> bool {
230 self.leave_when_empty
231 && self.pending_room_update.is_none()
232 && self.pending_participants.is_empty()
233 && self.remote_participants.is_empty()
234 && self.pending_call_count == 0
235 }
236
237 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
238 if self.status.is_offline() {
239 return Err(anyhow!("room is offline"));
240 }
241
242 cx.notify();
243 cx.emit(Event::Left);
244 log::info!("leaving room");
245
246 for project in self.shared_projects.drain() {
247 if let Some(project) = project.upgrade(cx) {
248 project.update(cx, |project, cx| {
249 project.unshare(cx).log_err();
250 });
251 }
252 }
253 for project in self.joined_projects.drain() {
254 if let Some(project) = project.upgrade(cx) {
255 project.update(cx, |project, cx| {
256 project.disconnected_from_host(cx);
257 });
258 }
259 }
260
261 self.status = RoomStatus::Offline;
262 self.remote_participants.clear();
263 self.pending_participants.clear();
264 self.participant_user_ids.clear();
265 self.subscriptions.clear();
266 self.live_kit.take();
267 self.pending_room_update.take();
268 self.maintain_connection.take();
269 self.client.send(proto::LeaveRoom {})?;
270 Ok(())
271 }
272
273 async fn maintain_connection(
274 this: WeakModelHandle<Self>,
275 client: Arc<Client>,
276 mut cx: AsyncAppContext,
277 ) -> Result<()> {
278 let mut client_status = client.status();
279 loop {
280 let is_connected = client_status
281 .next()
282 .await
283 .map_or(false, |s| s.is_connected());
284
285 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
286 if !is_connected || client_status.next().await.is_some() {
287 log::info!("detected client disconnection");
288 this.upgrade(&cx)
289 .ok_or_else(|| anyhow!("room was dropped"))?
290 .update(&mut cx, |this, cx| {
291 this.status = RoomStatus::Rejoining;
292 cx.notify();
293 });
294
295 // Wait for client to re-establish a connection to the server.
296 {
297 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
298 let client_reconnection = async {
299 let mut remaining_attempts = 3;
300 while remaining_attempts > 0 {
301 log::info!(
302 "waiting for client status change, remaining attempts {}",
303 remaining_attempts
304 );
305 let Some(status) = client_status.next().await else { break };
306 if status.is_connected() {
307 log::info!("client reconnected, attempting to rejoin room");
308
309 let Some(this) = this.upgrade(&cx) else { break };
310 if this
311 .update(&mut cx, |this, cx| this.rejoin(cx))
312 .await
313 .log_err()
314 .is_some()
315 {
316 return true;
317 } else {
318 remaining_attempts -= 1;
319 }
320 }
321 }
322 false
323 }
324 .fuse();
325 futures::pin_mut!(client_reconnection);
326
327 futures::select_biased! {
328 reconnected = client_reconnection => {
329 if reconnected {
330 log::info!("successfully reconnected to room");
331 // If we successfully joined the room, go back around the loop
332 // waiting for future connection status changes.
333 continue;
334 }
335 }
336 _ = reconnection_timeout => {
337 log::info!("room reconnection timeout expired");
338 }
339 }
340 }
341
342 // The client failed to re-establish a connection to the server
343 // or an error occurred while trying to re-join the room. Either way
344 // we leave the room and return an error.
345 if let Some(this) = this.upgrade(&cx) {
346 log::info!("reconnection failed, leaving room");
347 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
348 }
349 return Err(anyhow!(
350 "can't reconnect to room: client failed to re-establish connection"
351 ));
352 }
353 }
354 }
355
356 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
357 let mut projects = HashMap::default();
358 let mut reshared_projects = Vec::new();
359 let mut rejoined_projects = Vec::new();
360 self.shared_projects.retain(|project| {
361 if let Some(handle) = project.upgrade(cx) {
362 let project = handle.read(cx);
363 if let Some(project_id) = project.remote_id() {
364 projects.insert(project_id, handle.clone());
365 reshared_projects.push(proto::UpdateProject {
366 project_id,
367 worktrees: project.worktree_metadata_protos(cx),
368 });
369 return true;
370 }
371 }
372 false
373 });
374 self.joined_projects.retain(|project| {
375 if let Some(handle) = project.upgrade(cx) {
376 let project = handle.read(cx);
377 if let Some(project_id) = project.remote_id() {
378 projects.insert(project_id, handle.clone());
379 rejoined_projects.push(proto::RejoinProject {
380 id: project_id,
381 worktrees: project
382 .worktrees(cx)
383 .map(|worktree| {
384 let worktree = worktree.read(cx);
385 proto::RejoinWorktree {
386 id: worktree.id().to_proto(),
387 scan_id: worktree.completed_scan_id() as u64,
388 }
389 })
390 .collect(),
391 });
392 }
393 return true;
394 }
395 false
396 });
397
398 let response = self.client.request(proto::RejoinRoom {
399 id: self.id,
400 reshared_projects,
401 rejoined_projects,
402 });
403
404 cx.spawn(|this, mut cx| async move {
405 let response = response.await?;
406 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
407 this.update(&mut cx, |this, cx| {
408 this.status = RoomStatus::Online;
409 this.apply_room_update(room_proto, cx)?;
410
411 for reshared_project in response.reshared_projects {
412 if let Some(project) = projects.get(&reshared_project.id) {
413 project.update(cx, |project, cx| {
414 project.reshared(reshared_project, cx).log_err();
415 });
416 }
417 }
418
419 for rejoined_project in response.rejoined_projects {
420 if let Some(project) = projects.get(&rejoined_project.id) {
421 project.update(cx, |project, cx| {
422 project.rejoined(rejoined_project, cx).log_err();
423 });
424 }
425 }
426
427 anyhow::Ok(())
428 })
429 })
430 }
431
432 pub fn id(&self) -> u64 {
433 self.id
434 }
435
436 pub fn status(&self) -> RoomStatus {
437 self.status
438 }
439
440 pub fn local_participant(&self) -> &LocalParticipant {
441 &self.local_participant
442 }
443
444 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
445 &self.remote_participants
446 }
447
448 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
449 self.remote_participants
450 .values()
451 .find(|p| p.peer_id == peer_id)
452 }
453
454 pub fn pending_participants(&self) -> &[Arc<User>] {
455 &self.pending_participants
456 }
457
458 pub fn contains_participant(&self, user_id: u64) -> bool {
459 self.participant_user_ids.contains(&user_id)
460 }
461
462 pub fn followers_for(&self, leader_id: PeerId) -> &[PeerId] {
463 self.follows_by_leader_id
464 .get(&leader_id)
465 .map_or(&[], |v| v.as_slice())
466 }
467
468 async fn handle_room_updated(
469 this: ModelHandle<Self>,
470 envelope: TypedEnvelope<proto::RoomUpdated>,
471 _: Arc<Client>,
472 mut cx: AsyncAppContext,
473 ) -> Result<()> {
474 let room = envelope
475 .payload
476 .room
477 .ok_or_else(|| anyhow!("invalid room"))?;
478 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
479 }
480
481 fn apply_room_update(
482 &mut self,
483 mut room: proto::Room,
484 cx: &mut ModelContext<Self>,
485 ) -> Result<()> {
486 // Filter ourselves out from the room's participants.
487 let local_participant_ix = room
488 .participants
489 .iter()
490 .position(|participant| Some(participant.user_id) == self.client.user_id());
491 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
492
493 let pending_participant_user_ids = room
494 .pending_participants
495 .iter()
496 .map(|p| p.user_id)
497 .collect::<Vec<_>>();
498
499 let remote_participant_user_ids = room
500 .participants
501 .iter()
502 .map(|p| p.user_id)
503 .collect::<Vec<_>>();
504
505 let (remote_participants, pending_participants) =
506 self.user_store.update(cx, move |user_store, cx| {
507 (
508 user_store.get_users(remote_participant_user_ids, cx),
509 user_store.get_users(pending_participant_user_ids, cx),
510 )
511 });
512
513 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
514 let (remote_participants, pending_participants) =
515 futures::join!(remote_participants, pending_participants);
516
517 this.update(&mut cx, |this, cx| {
518 this.participant_user_ids.clear();
519
520 if let Some(participant) = local_participant {
521 this.local_participant.projects = participant.projects;
522 } else {
523 this.local_participant.projects.clear();
524 }
525
526 if let Some(participants) = remote_participants.log_err() {
527 for (participant, user) in room.participants.into_iter().zip(participants) {
528 let Some(peer_id) = participant.peer_id else { continue };
529 this.participant_user_ids.insert(participant.user_id);
530
531 let old_projects = this
532 .remote_participants
533 .get(&participant.user_id)
534 .into_iter()
535 .flat_map(|existing| &existing.projects)
536 .map(|project| project.id)
537 .collect::<HashSet<_>>();
538 let new_projects = participant
539 .projects
540 .iter()
541 .map(|project| project.id)
542 .collect::<HashSet<_>>();
543
544 for project in &participant.projects {
545 if !old_projects.contains(&project.id) {
546 cx.emit(Event::RemoteProjectShared {
547 owner: user.clone(),
548 project_id: project.id,
549 worktree_root_names: project.worktree_root_names.clone(),
550 });
551 }
552 }
553
554 for unshared_project_id in old_projects.difference(&new_projects) {
555 this.joined_projects.retain(|project| {
556 if let Some(project) = project.upgrade(cx) {
557 project.update(cx, |project, cx| {
558 if project.remote_id() == Some(*unshared_project_id) {
559 project.disconnected_from_host(cx);
560 false
561 } else {
562 true
563 }
564 })
565 } else {
566 false
567 }
568 });
569 cx.emit(Event::RemoteProjectUnshared {
570 project_id: *unshared_project_id,
571 });
572 }
573
574 let location = ParticipantLocation::from_proto(participant.location)
575 .unwrap_or(ParticipantLocation::External);
576 if let Some(remote_participant) =
577 this.remote_participants.get_mut(&participant.user_id)
578 {
579 remote_participant.projects = participant.projects;
580 remote_participant.peer_id = peer_id;
581 if location != remote_participant.location {
582 remote_participant.location = location;
583 cx.emit(Event::ParticipantLocationChanged {
584 participant_id: peer_id,
585 });
586 }
587 } else {
588 this.remote_participants.insert(
589 participant.user_id,
590 RemoteParticipant {
591 user: user.clone(),
592 peer_id,
593 projects: participant.projects,
594 location,
595 tracks: Default::default(),
596 },
597 );
598
599 if let Some(live_kit) = this.live_kit.as_ref() {
600 let tracks =
601 live_kit.room.remote_video_tracks(&peer_id.to_string());
602 for track in tracks {
603 this.remote_video_track_updated(
604 RemoteVideoTrackUpdate::Subscribed(track),
605 cx,
606 )
607 .log_err();
608 }
609 }
610 }
611 }
612
613 this.remote_participants.retain(|user_id, participant| {
614 if this.participant_user_ids.contains(user_id) {
615 true
616 } else {
617 for project in &participant.projects {
618 cx.emit(Event::RemoteProjectUnshared {
619 project_id: project.id,
620 });
621 }
622 false
623 }
624 });
625 }
626
627 if let Some(pending_participants) = pending_participants.log_err() {
628 this.pending_participants = pending_participants;
629 for participant in &this.pending_participants {
630 this.participant_user_ids.insert(participant.id);
631 }
632 }
633
634 this.follows_by_leader_id.clear();
635 for follower in room.followers {
636 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
637 (Some(leader), Some(follower)) => (leader, follower),
638
639 _ => {
640 log::error!("Follower message {follower:?} missing some state");
641 continue;
642 }
643 };
644
645 let list = this
646 .follows_by_leader_id
647 .entry(leader)
648 .or_insert(Vec::new());
649 if !list.contains(&follower) {
650 list.push(follower);
651 }
652 }
653
654 this.pending_room_update.take();
655 if this.should_leave() {
656 log::info!("room is empty, leaving");
657 let _ = this.leave(cx);
658 }
659
660 this.check_invariants();
661 cx.notify();
662 });
663 }));
664
665 cx.notify();
666 Ok(())
667 }
668
669 fn remote_video_track_updated(
670 &mut self,
671 change: RemoteVideoTrackUpdate,
672 cx: &mut ModelContext<Self>,
673 ) -> Result<()> {
674 match change {
675 RemoteVideoTrackUpdate::Subscribed(track) => {
676 let user_id = track.publisher_id().parse()?;
677 let track_id = track.sid().to_string();
678 let participant = self
679 .remote_participants
680 .get_mut(&user_id)
681 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
682 participant.tracks.insert(
683 track_id.clone(),
684 Arc::new(RemoteVideoTrack {
685 live_kit_track: track,
686 }),
687 );
688 cx.emit(Event::RemoteVideoTracksChanged {
689 participant_id: participant.peer_id,
690 });
691 }
692 RemoteVideoTrackUpdate::Unsubscribed {
693 publisher_id,
694 track_id,
695 } => {
696 let user_id = publisher_id.parse()?;
697 let participant = self
698 .remote_participants
699 .get_mut(&user_id)
700 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
701 participant.tracks.remove(&track_id);
702 cx.emit(Event::RemoteVideoTracksChanged {
703 participant_id: participant.peer_id,
704 });
705 }
706 }
707
708 cx.notify();
709 Ok(())
710 }
711
712 fn check_invariants(&self) {
713 #[cfg(any(test, feature = "test-support"))]
714 {
715 for participant in self.remote_participants.values() {
716 assert!(self.participant_user_ids.contains(&participant.user.id));
717 assert_ne!(participant.user.id, self.client.user_id().unwrap());
718 }
719
720 for participant in &self.pending_participants {
721 assert!(self.participant_user_ids.contains(&participant.id));
722 assert_ne!(participant.id, self.client.user_id().unwrap());
723 }
724
725 assert_eq!(
726 self.participant_user_ids.len(),
727 self.remote_participants.len() + self.pending_participants.len()
728 );
729 }
730 }
731
732 pub(crate) fn call(
733 &mut self,
734 called_user_id: u64,
735 initial_project_id: Option<u64>,
736 cx: &mut ModelContext<Self>,
737 ) -> Task<Result<()>> {
738 if self.status.is_offline() {
739 return Task::ready(Err(anyhow!("room is offline")));
740 }
741
742 cx.notify();
743 let client = self.client.clone();
744 let room_id = self.id;
745 self.pending_call_count += 1;
746 cx.spawn(|this, mut cx| async move {
747 let result = client
748 .request(proto::Call {
749 room_id,
750 called_user_id,
751 initial_project_id,
752 })
753 .await;
754 this.update(&mut cx, |this, cx| {
755 this.pending_call_count -= 1;
756 if this.should_leave() {
757 this.leave(cx)?;
758 }
759 result
760 })?;
761 Ok(())
762 })
763 }
764
765 pub fn join_project(
766 &mut self,
767 id: u64,
768 language_registry: Arc<LanguageRegistry>,
769 fs: Arc<dyn Fs>,
770 cx: &mut ModelContext<Self>,
771 ) -> Task<Result<ModelHandle<Project>>> {
772 let client = self.client.clone();
773 let user_store = self.user_store.clone();
774 cx.spawn(|this, mut cx| async move {
775 let project =
776 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
777 this.update(&mut cx, |this, cx| {
778 this.joined_projects.retain(|project| {
779 if let Some(project) = project.upgrade(cx) {
780 !project.read(cx).is_read_only()
781 } else {
782 false
783 }
784 });
785 this.joined_projects.insert(project.downgrade());
786 });
787 Ok(project)
788 })
789 }
790
791 pub(crate) fn share_project(
792 &mut self,
793 project: ModelHandle<Project>,
794 cx: &mut ModelContext<Self>,
795 ) -> Task<Result<u64>> {
796 if let Some(project_id) = project.read(cx).remote_id() {
797 return Task::ready(Ok(project_id));
798 }
799
800 let request = self.client.request(proto::ShareProject {
801 room_id: self.id(),
802 worktrees: project.read(cx).worktree_metadata_protos(cx),
803 });
804 cx.spawn(|this, mut cx| async move {
805 let response = request.await?;
806
807 project.update(&mut cx, |project, cx| {
808 project.shared(response.project_id, cx)
809 })?;
810
811 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
812 this.update(&mut cx, |this, cx| {
813 this.shared_projects.insert(project.downgrade());
814 let active_project = this.local_participant.active_project.as_ref();
815 if active_project.map_or(false, |location| *location == project) {
816 this.set_location(Some(&project), cx)
817 } else {
818 Task::ready(Ok(()))
819 }
820 })
821 .await?;
822
823 Ok(response.project_id)
824 })
825 }
826
827 pub(crate) fn unshare_project(
828 &mut self,
829 project: ModelHandle<Project>,
830 cx: &mut ModelContext<Self>,
831 ) -> Result<()> {
832 let project_id = match project.read(cx).remote_id() {
833 Some(project_id) => project_id,
834 None => return Ok(()),
835 };
836
837 self.client.send(proto::UnshareProject { project_id })?;
838 project.update(cx, |this, cx| this.unshare(cx))
839 }
840
841 pub(crate) fn set_location(
842 &mut self,
843 project: Option<&ModelHandle<Project>>,
844 cx: &mut ModelContext<Self>,
845 ) -> Task<Result<()>> {
846 if self.status.is_offline() {
847 return Task::ready(Err(anyhow!("room is offline")));
848 }
849
850 let client = self.client.clone();
851 let room_id = self.id;
852 let location = if let Some(project) = project {
853 self.local_participant.active_project = Some(project.downgrade());
854 if let Some(project_id) = project.read(cx).remote_id() {
855 proto::participant_location::Variant::SharedProject(
856 proto::participant_location::SharedProject { id: project_id },
857 )
858 } else {
859 proto::participant_location::Variant::UnsharedProject(
860 proto::participant_location::UnsharedProject {},
861 )
862 }
863 } else {
864 self.local_participant.active_project = None;
865 proto::participant_location::Variant::External(proto::participant_location::External {})
866 };
867
868 cx.notify();
869 cx.foreground().spawn(async move {
870 client
871 .request(proto::UpdateParticipantLocation {
872 room_id,
873 location: Some(proto::ParticipantLocation {
874 variant: Some(location),
875 }),
876 })
877 .await?;
878 Ok(())
879 })
880 }
881
882 pub fn is_screen_sharing(&self) -> bool {
883 self.live_kit.as_ref().map_or(false, |live_kit| {
884 !matches!(live_kit.screen_track, ScreenTrack::None)
885 })
886 }
887
888 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
889 if self.status.is_offline() {
890 return Task::ready(Err(anyhow!("room is offline")));
891 } else if self.is_screen_sharing() {
892 return Task::ready(Err(anyhow!("screen was already shared")));
893 }
894
895 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
896 let publish_id = post_inc(&mut live_kit.next_publish_id);
897 live_kit.screen_track = ScreenTrack::Pending { publish_id };
898 cx.notify();
899 (live_kit.room.display_sources(), publish_id)
900 } else {
901 return Task::ready(Err(anyhow!("live-kit was not initialized")));
902 };
903
904 cx.spawn_weak(|this, mut cx| async move {
905 let publish_track = async {
906 let displays = displays.await?;
907 let display = displays
908 .first()
909 .ok_or_else(|| anyhow!("no display found"))?;
910 let track = LocalVideoTrack::screen_share_for_display(&display);
911 this.upgrade(&cx)
912 .ok_or_else(|| anyhow!("room was dropped"))?
913 .read_with(&cx, |this, _| {
914 this.live_kit
915 .as_ref()
916 .map(|live_kit| live_kit.room.publish_video_track(&track))
917 })
918 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
919 .await
920 };
921
922 let publication = publish_track.await;
923 this.upgrade(&cx)
924 .ok_or_else(|| anyhow!("room was dropped"))?
925 .update(&mut cx, |this, cx| {
926 let live_kit = this
927 .live_kit
928 .as_mut()
929 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
930
931 let canceled = if let ScreenTrack::Pending {
932 publish_id: cur_publish_id,
933 } = &live_kit.screen_track
934 {
935 *cur_publish_id != publish_id
936 } else {
937 true
938 };
939
940 match publication {
941 Ok(publication) => {
942 if canceled {
943 live_kit.room.unpublish_track(publication);
944 } else {
945 live_kit.screen_track = ScreenTrack::Published(publication);
946 cx.notify();
947 }
948 Ok(())
949 }
950 Err(error) => {
951 if canceled {
952 Ok(())
953 } else {
954 live_kit.screen_track = ScreenTrack::None;
955 cx.notify();
956 Err(error)
957 }
958 }
959 }
960 })
961 })
962 }
963
964 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
965 if self.status.is_offline() {
966 return Err(anyhow!("room is offline"));
967 }
968
969 let live_kit = self
970 .live_kit
971 .as_mut()
972 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
973 match mem::take(&mut live_kit.screen_track) {
974 ScreenTrack::None => Err(anyhow!("screen was not shared")),
975 ScreenTrack::Pending { .. } => {
976 cx.notify();
977 Ok(())
978 }
979 ScreenTrack::Published(track) => {
980 live_kit.room.unpublish_track(track);
981 cx.notify();
982 Ok(())
983 }
984 }
985 }
986
987 #[cfg(any(test, feature = "test-support"))]
988 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
989 self.live_kit
990 .as_ref()
991 .unwrap()
992 .room
993 .set_display_sources(sources);
994 }
995}
996
997struct LiveKitRoom {
998 room: Arc<live_kit_client::Room>,
999 screen_track: ScreenTrack,
1000 next_publish_id: usize,
1001 _maintain_room: Task<()>,
1002 _maintain_tracks: Task<()>,
1003}
1004
1005enum ScreenTrack {
1006 None,
1007 Pending { publish_id: usize },
1008 Published(LocalTrackPublication),
1009}
1010
1011impl Default for ScreenTrack {
1012 fn default() -> Self {
1013 Self::None
1014 }
1015}
1016
1017#[derive(Copy, Clone, PartialEq, Eq)]
1018pub enum RoomStatus {
1019 Online,
1020 Rejoining,
1021 Offline,
1022}
1023
1024impl RoomStatus {
1025 pub fn is_offline(&self) -> bool {
1026 matches!(self, RoomStatus::Offline)
1027 }
1028
1029 pub fn is_online(&self) -> bool {
1030 matches!(self, RoomStatus::Online)
1031 }
1032}