1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{
7 proto::{self, PeerId},
8 Client, TypedEnvelope, User, UserStore,
9};
10use collections::{BTreeMap, HashMap, HashSet};
11use fs::Fs;
12use futures::{FutureExt, StreamExt};
13use gpui::{
14 AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
15};
16use language::LanguageRegistry;
17use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
18use postage::stream::Stream;
19use project::Project;
20use std::{mem, sync::Arc, time::Duration};
21use util::{post_inc, ResultExt, TryFutureExt};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub enum Event {
27 ParticipantLocationChanged {
28 participant_id: proto::PeerId,
29 },
30 RemoteVideoTracksChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteProjectShared {
34 owner: Arc<User>,
35 project_id: u64,
36 worktree_root_names: Vec<String>,
37 },
38 RemoteProjectUnshared {
39 project_id: u64,
40 },
41 Left,
42}
43
44pub struct Room {
45 id: u64,
46 live_kit: Option<LiveKitRoom>,
47 status: RoomStatus,
48 shared_projects: HashSet<WeakModelHandle<Project>>,
49 joined_projects: HashSet<WeakModelHandle<Project>>,
50 local_participant: LocalParticipant,
51 remote_participants: BTreeMap<u64, RemoteParticipant>,
52 pending_participants: Vec<Arc<User>>,
53 participant_user_ids: HashSet<u64>,
54 pending_call_count: usize,
55 leave_when_empty: bool,
56 client: Arc<Client>,
57 user_store: ModelHandle<UserStore>,
58 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
59 subscriptions: Vec<client::Subscription>,
60 pending_room_update: Option<Task<()>>,
61 maintain_connection: Option<Task<Option<()>>>,
62}
63
64impl Entity for Room {
65 type Event = Event;
66
67 fn release(&mut self, _: &mut MutableAppContext) {
68 if self.status.is_online() {
69 log::info!("room was released, sending leave message");
70 let _ = self.client.send(proto::LeaveRoom {});
71 }
72 }
73}
74
75impl Room {
76 fn new(
77 id: u64,
78 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
79 client: Arc<Client>,
80 user_store: ModelHandle<UserStore>,
81 cx: &mut ModelContext<Self>,
82 ) -> Self {
83 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
84 let room = live_kit_client::Room::new();
85 let mut status = room.status();
86 // Consume the initial status of the room.
87 let _ = status.try_recv();
88 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
89 while let Some(status) = status.next().await {
90 let this = if let Some(this) = this.upgrade(&cx) {
91 this
92 } else {
93 break;
94 };
95
96 if status == live_kit_client::ConnectionState::Disconnected {
97 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
98 break;
99 }
100 }
101 });
102
103 let mut track_changes = room.remote_video_track_updates();
104 let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
105 while let Some(track_change) = track_changes.next().await {
106 let this = if let Some(this) = this.upgrade(&cx) {
107 this
108 } else {
109 break;
110 };
111
112 this.update(&mut cx, |this, cx| {
113 this.remote_video_track_updated(track_change, cx).log_err()
114 });
115 }
116 });
117
118 cx.foreground()
119 .spawn(room.connect(&connection_info.server_url, &connection_info.token))
120 .detach_and_log_err(cx);
121
122 Some(LiveKitRoom {
123 room,
124 screen_track: ScreenTrack::None,
125 next_publish_id: 0,
126 _maintain_room,
127 _maintain_tracks,
128 })
129 } else {
130 None
131 };
132
133 let maintain_connection =
134 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
135
136 Self {
137 id,
138 live_kit: live_kit_room,
139 status: RoomStatus::Online,
140 shared_projects: Default::default(),
141 joined_projects: Default::default(),
142 participant_user_ids: Default::default(),
143 local_participant: Default::default(),
144 remote_participants: Default::default(),
145 pending_participants: Default::default(),
146 pending_call_count: 0,
147 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
148 leave_when_empty: false,
149 pending_room_update: None,
150 client,
151 user_store,
152 follows_by_leader_id_project_id: Default::default(),
153 maintain_connection: Some(maintain_connection),
154 }
155 }
156
157 pub(crate) fn create(
158 called_user_id: u64,
159 initial_project: Option<ModelHandle<Project>>,
160 client: Arc<Client>,
161 user_store: ModelHandle<UserStore>,
162 cx: &mut MutableAppContext,
163 ) -> Task<Result<ModelHandle<Self>>> {
164 cx.spawn(|mut cx| async move {
165 let response = client.request(proto::CreateRoom {}).await?;
166 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
167 let room = cx.add_model(|cx| {
168 Self::new(
169 room_proto.id,
170 response.live_kit_connection_info,
171 client,
172 user_store,
173 cx,
174 )
175 });
176
177 let initial_project_id = if let Some(initial_project) = initial_project {
178 let initial_project_id = room
179 .update(&mut cx, |room, cx| {
180 room.share_project(initial_project.clone(), cx)
181 })
182 .await?;
183 Some(initial_project_id)
184 } else {
185 None
186 };
187
188 match room
189 .update(&mut cx, |room, cx| {
190 room.leave_when_empty = true;
191 room.call(called_user_id, initial_project_id, cx)
192 })
193 .await
194 {
195 Ok(()) => Ok(room),
196 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
197 }
198 })
199 }
200
201 pub(crate) fn join(
202 call: &IncomingCall,
203 client: Arc<Client>,
204 user_store: ModelHandle<UserStore>,
205 cx: &mut MutableAppContext,
206 ) -> Task<Result<ModelHandle<Self>>> {
207 let room_id = call.room_id;
208 cx.spawn(|mut cx| async move {
209 let response = client.request(proto::JoinRoom { id: room_id }).await?;
210 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
211 let room = cx.add_model(|cx| {
212 Self::new(
213 room_id,
214 response.live_kit_connection_info,
215 client,
216 user_store,
217 cx,
218 )
219 });
220 room.update(&mut cx, |room, cx| {
221 room.leave_when_empty = true;
222 room.apply_room_update(room_proto, cx)?;
223 anyhow::Ok(())
224 })?;
225 Ok(room)
226 })
227 }
228
229 fn should_leave(&self) -> bool {
230 self.leave_when_empty
231 && self.pending_room_update.is_none()
232 && self.pending_participants.is_empty()
233 && self.remote_participants.is_empty()
234 && self.pending_call_count == 0
235 }
236
237 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
238 if self.status.is_offline() {
239 return Err(anyhow!("room is offline"));
240 }
241
242 cx.notify();
243 cx.emit(Event::Left);
244 log::info!("leaving room");
245
246 for project in self.shared_projects.drain() {
247 if let Some(project) = project.upgrade(cx) {
248 project.update(cx, |project, cx| {
249 project.unshare(cx).log_err();
250 });
251 }
252 }
253 for project in self.joined_projects.drain() {
254 if let Some(project) = project.upgrade(cx) {
255 project.update(cx, |project, cx| {
256 project.disconnected_from_host(cx);
257 });
258 }
259 }
260
261 self.status = RoomStatus::Offline;
262 self.remote_participants.clear();
263 self.pending_participants.clear();
264 self.participant_user_ids.clear();
265 self.subscriptions.clear();
266 self.live_kit.take();
267 self.pending_room_update.take();
268 self.maintain_connection.take();
269 self.client.send(proto::LeaveRoom {})?;
270 Ok(())
271 }
272
273 async fn maintain_connection(
274 this: WeakModelHandle<Self>,
275 client: Arc<Client>,
276 mut cx: AsyncAppContext,
277 ) -> Result<()> {
278 let mut client_status = client.status();
279 loop {
280 let _ = client_status.try_recv();
281 let is_connected = client_status.borrow().is_connected();
282 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
283 if !is_connected || client_status.next().await.is_some() {
284 log::info!("detected client disconnection");
285
286 this.upgrade(&cx)
287 .ok_or_else(|| anyhow!("room was dropped"))?
288 .update(&mut cx, |this, cx| {
289 this.status = RoomStatus::Rejoining;
290 cx.notify();
291 });
292
293 // Wait for client to re-establish a connection to the server.
294 {
295 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
296 let client_reconnection = async {
297 let mut remaining_attempts = 3;
298 while remaining_attempts > 0 {
299 if client_status.borrow().is_connected() {
300 log::info!("client reconnected, attempting to rejoin room");
301
302 let Some(this) = this.upgrade(&cx) else { break };
303 if this
304 .update(&mut cx, |this, cx| this.rejoin(cx))
305 .await
306 .log_err()
307 .is_some()
308 {
309 return true;
310 } else {
311 remaining_attempts -= 1;
312 }
313 } else if client_status.borrow().is_signed_out() {
314 return false;
315 }
316
317 log::info!(
318 "waiting for client status change, remaining attempts {}",
319 remaining_attempts
320 );
321 client_status.next().await;
322 }
323 false
324 }
325 .fuse();
326 futures::pin_mut!(client_reconnection);
327
328 futures::select_biased! {
329 reconnected = client_reconnection => {
330 if reconnected {
331 log::info!("successfully reconnected to room");
332 // If we successfully joined the room, go back around the loop
333 // waiting for future connection status changes.
334 continue;
335 }
336 }
337 _ = reconnection_timeout => {
338 log::info!("room reconnection timeout expired");
339 }
340 }
341 }
342
343 break;
344 }
345 }
346
347 // The client failed to re-establish a connection to the server
348 // or an error occurred while trying to re-join the room. Either way
349 // we leave the room and return an error.
350 if let Some(this) = this.upgrade(&cx) {
351 log::info!("reconnection failed, leaving room");
352 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
353 }
354 Err(anyhow!(
355 "can't reconnect to room: client failed to re-establish connection"
356 ))
357 }
358
359 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
360 let mut projects = HashMap::default();
361 let mut reshared_projects = Vec::new();
362 let mut rejoined_projects = Vec::new();
363 self.shared_projects.retain(|project| {
364 if let Some(handle) = project.upgrade(cx) {
365 let project = handle.read(cx);
366 if let Some(project_id) = project.remote_id() {
367 projects.insert(project_id, handle.clone());
368 reshared_projects.push(proto::UpdateProject {
369 project_id,
370 worktrees: project.worktree_metadata_protos(cx),
371 });
372 return true;
373 }
374 }
375 false
376 });
377 self.joined_projects.retain(|project| {
378 if let Some(handle) = project.upgrade(cx) {
379 let project = handle.read(cx);
380 if let Some(project_id) = project.remote_id() {
381 projects.insert(project_id, handle.clone());
382 rejoined_projects.push(proto::RejoinProject {
383 id: project_id,
384 worktrees: project
385 .worktrees(cx)
386 .map(|worktree| {
387 let worktree = worktree.read(cx);
388 proto::RejoinWorktree {
389 id: worktree.id().to_proto(),
390 scan_id: worktree.completed_scan_id() as u64,
391 }
392 })
393 .collect(),
394 });
395 }
396 return true;
397 }
398 false
399 });
400
401 let response = self.client.request(proto::RejoinRoom {
402 id: self.id,
403 reshared_projects,
404 rejoined_projects,
405 });
406
407 cx.spawn(|this, mut cx| async move {
408 let response = response.await?;
409 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
410 this.update(&mut cx, |this, cx| {
411 this.status = RoomStatus::Online;
412 this.apply_room_update(room_proto, cx)?;
413
414 for reshared_project in response.reshared_projects {
415 if let Some(project) = projects.get(&reshared_project.id) {
416 project.update(cx, |project, cx| {
417 project.reshared(reshared_project, cx).log_err();
418 });
419 }
420 }
421
422 for rejoined_project in response.rejoined_projects {
423 if let Some(project) = projects.get(&rejoined_project.id) {
424 project.update(cx, |project, cx| {
425 project.rejoined(rejoined_project, cx).log_err();
426 });
427 }
428 }
429
430 anyhow::Ok(())
431 })
432 })
433 }
434
435 pub fn id(&self) -> u64 {
436 self.id
437 }
438
439 pub fn status(&self) -> RoomStatus {
440 self.status
441 }
442
443 pub fn local_participant(&self) -> &LocalParticipant {
444 &self.local_participant
445 }
446
447 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
448 &self.remote_participants
449 }
450
451 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
452 self.remote_participants
453 .values()
454 .find(|p| p.peer_id == peer_id)
455 }
456
457 pub fn pending_participants(&self) -> &[Arc<User>] {
458 &self.pending_participants
459 }
460
461 pub fn contains_participant(&self, user_id: u64) -> bool {
462 self.participant_user_ids.contains(&user_id)
463 }
464
465 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
466 self.follows_by_leader_id_project_id
467 .get(&(leader_id, project_id))
468 .map_or(&[], |v| v.as_slice())
469 }
470
471 async fn handle_room_updated(
472 this: ModelHandle<Self>,
473 envelope: TypedEnvelope<proto::RoomUpdated>,
474 _: Arc<Client>,
475 mut cx: AsyncAppContext,
476 ) -> Result<()> {
477 let room = envelope
478 .payload
479 .room
480 .ok_or_else(|| anyhow!("invalid room"))?;
481 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
482 }
483
484 fn apply_room_update(
485 &mut self,
486 mut room: proto::Room,
487 cx: &mut ModelContext<Self>,
488 ) -> Result<()> {
489 // Filter ourselves out from the room's participants.
490 let local_participant_ix = room
491 .participants
492 .iter()
493 .position(|participant| Some(participant.user_id) == self.client.user_id());
494 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
495
496 let pending_participant_user_ids = room
497 .pending_participants
498 .iter()
499 .map(|p| p.user_id)
500 .collect::<Vec<_>>();
501
502 let remote_participant_user_ids = room
503 .participants
504 .iter()
505 .map(|p| p.user_id)
506 .collect::<Vec<_>>();
507
508 let (remote_participants, pending_participants) =
509 self.user_store.update(cx, move |user_store, cx| {
510 (
511 user_store.get_users(remote_participant_user_ids, cx),
512 user_store.get_users(pending_participant_user_ids, cx),
513 )
514 });
515
516 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
517 let (remote_participants, pending_participants) =
518 futures::join!(remote_participants, pending_participants);
519
520 this.update(&mut cx, |this, cx| {
521 this.participant_user_ids.clear();
522
523 if let Some(participant) = local_participant {
524 this.local_participant.projects = participant.projects;
525 } else {
526 this.local_participant.projects.clear();
527 }
528
529 if let Some(participants) = remote_participants.log_err() {
530 for (participant, user) in room.participants.into_iter().zip(participants) {
531 let Some(peer_id) = participant.peer_id else { continue };
532 this.participant_user_ids.insert(participant.user_id);
533
534 let old_projects = this
535 .remote_participants
536 .get(&participant.user_id)
537 .into_iter()
538 .flat_map(|existing| &existing.projects)
539 .map(|project| project.id)
540 .collect::<HashSet<_>>();
541 let new_projects = participant
542 .projects
543 .iter()
544 .map(|project| project.id)
545 .collect::<HashSet<_>>();
546
547 for project in &participant.projects {
548 if !old_projects.contains(&project.id) {
549 cx.emit(Event::RemoteProjectShared {
550 owner: user.clone(),
551 project_id: project.id,
552 worktree_root_names: project.worktree_root_names.clone(),
553 });
554 }
555 }
556
557 for unshared_project_id in old_projects.difference(&new_projects) {
558 this.joined_projects.retain(|project| {
559 if let Some(project) = project.upgrade(cx) {
560 project.update(cx, |project, cx| {
561 if project.remote_id() == Some(*unshared_project_id) {
562 project.disconnected_from_host(cx);
563 false
564 } else {
565 true
566 }
567 })
568 } else {
569 false
570 }
571 });
572 cx.emit(Event::RemoteProjectUnshared {
573 project_id: *unshared_project_id,
574 });
575 }
576
577 let location = ParticipantLocation::from_proto(participant.location)
578 .unwrap_or(ParticipantLocation::External);
579 if let Some(remote_participant) =
580 this.remote_participants.get_mut(&participant.user_id)
581 {
582 remote_participant.projects = participant.projects;
583 remote_participant.peer_id = peer_id;
584 if location != remote_participant.location {
585 remote_participant.location = location;
586 cx.emit(Event::ParticipantLocationChanged {
587 participant_id: peer_id,
588 });
589 }
590 } else {
591 this.remote_participants.insert(
592 participant.user_id,
593 RemoteParticipant {
594 user: user.clone(),
595 peer_id,
596 projects: participant.projects,
597 location,
598 tracks: Default::default(),
599 },
600 );
601
602 if let Some(live_kit) = this.live_kit.as_ref() {
603 let tracks =
604 live_kit.room.remote_video_tracks(&peer_id.to_string());
605 for track in tracks {
606 this.remote_video_track_updated(
607 RemoteVideoTrackUpdate::Subscribed(track),
608 cx,
609 )
610 .log_err();
611 }
612 }
613 }
614 }
615
616 this.remote_participants.retain(|user_id, participant| {
617 if this.participant_user_ids.contains(user_id) {
618 true
619 } else {
620 for project in &participant.projects {
621 cx.emit(Event::RemoteProjectUnshared {
622 project_id: project.id,
623 });
624 }
625 false
626 }
627 });
628 }
629
630 if let Some(pending_participants) = pending_participants.log_err() {
631 this.pending_participants = pending_participants;
632 for participant in &this.pending_participants {
633 this.participant_user_ids.insert(participant.id);
634 }
635 }
636
637 this.follows_by_leader_id_project_id.clear();
638 for follower in room.followers {
639 let project_id = follower.project_id;
640 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
641 (Some(leader), Some(follower)) => (leader, follower),
642
643 _ => {
644 log::error!("Follower message {follower:?} missing some state");
645 continue;
646 }
647 };
648
649 let list = this
650 .follows_by_leader_id_project_id
651 .entry((leader, project_id))
652 .or_insert(Vec::new());
653 if !list.contains(&follower) {
654 list.push(follower);
655 }
656 }
657
658 this.pending_room_update.take();
659 if this.should_leave() {
660 log::info!("room is empty, leaving");
661 let _ = this.leave(cx);
662 }
663
664 this.check_invariants();
665 cx.notify();
666 });
667 }));
668
669 cx.notify();
670 Ok(())
671 }
672
673 fn remote_video_track_updated(
674 &mut self,
675 change: RemoteVideoTrackUpdate,
676 cx: &mut ModelContext<Self>,
677 ) -> Result<()> {
678 match change {
679 RemoteVideoTrackUpdate::Subscribed(track) => {
680 let user_id = track.publisher_id().parse()?;
681 let track_id = track.sid().to_string();
682 let participant = self
683 .remote_participants
684 .get_mut(&user_id)
685 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
686 participant.tracks.insert(
687 track_id.clone(),
688 Arc::new(RemoteVideoTrack {
689 live_kit_track: track,
690 }),
691 );
692 cx.emit(Event::RemoteVideoTracksChanged {
693 participant_id: participant.peer_id,
694 });
695 }
696 RemoteVideoTrackUpdate::Unsubscribed {
697 publisher_id,
698 track_id,
699 } => {
700 let user_id = publisher_id.parse()?;
701 let participant = self
702 .remote_participants
703 .get_mut(&user_id)
704 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
705 participant.tracks.remove(&track_id);
706 cx.emit(Event::RemoteVideoTracksChanged {
707 participant_id: participant.peer_id,
708 });
709 }
710 }
711
712 cx.notify();
713 Ok(())
714 }
715
716 fn check_invariants(&self) {
717 #[cfg(any(test, feature = "test-support"))]
718 {
719 for participant in self.remote_participants.values() {
720 assert!(self.participant_user_ids.contains(&participant.user.id));
721 assert_ne!(participant.user.id, self.client.user_id().unwrap());
722 }
723
724 for participant in &self.pending_participants {
725 assert!(self.participant_user_ids.contains(&participant.id));
726 assert_ne!(participant.id, self.client.user_id().unwrap());
727 }
728
729 assert_eq!(
730 self.participant_user_ids.len(),
731 self.remote_participants.len() + self.pending_participants.len()
732 );
733 }
734 }
735
736 pub(crate) fn call(
737 &mut self,
738 called_user_id: u64,
739 initial_project_id: Option<u64>,
740 cx: &mut ModelContext<Self>,
741 ) -> Task<Result<()>> {
742 if self.status.is_offline() {
743 return Task::ready(Err(anyhow!("room is offline")));
744 }
745
746 cx.notify();
747 let client = self.client.clone();
748 let room_id = self.id;
749 self.pending_call_count += 1;
750 cx.spawn(|this, mut cx| async move {
751 let result = client
752 .request(proto::Call {
753 room_id,
754 called_user_id,
755 initial_project_id,
756 })
757 .await;
758 this.update(&mut cx, |this, cx| {
759 this.pending_call_count -= 1;
760 if this.should_leave() {
761 this.leave(cx)?;
762 }
763 result
764 })?;
765 Ok(())
766 })
767 }
768
769 pub fn join_project(
770 &mut self,
771 id: u64,
772 language_registry: Arc<LanguageRegistry>,
773 fs: Arc<dyn Fs>,
774 cx: &mut ModelContext<Self>,
775 ) -> Task<Result<ModelHandle<Project>>> {
776 let client = self.client.clone();
777 let user_store = self.user_store.clone();
778 cx.spawn(|this, mut cx| async move {
779 let project =
780 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
781 this.update(&mut cx, |this, cx| {
782 this.joined_projects.retain(|project| {
783 if let Some(project) = project.upgrade(cx) {
784 !project.read(cx).is_read_only()
785 } else {
786 false
787 }
788 });
789 this.joined_projects.insert(project.downgrade());
790 });
791 Ok(project)
792 })
793 }
794
795 pub(crate) fn share_project(
796 &mut self,
797 project: ModelHandle<Project>,
798 cx: &mut ModelContext<Self>,
799 ) -> Task<Result<u64>> {
800 if let Some(project_id) = project.read(cx).remote_id() {
801 return Task::ready(Ok(project_id));
802 }
803
804 let request = self.client.request(proto::ShareProject {
805 room_id: self.id(),
806 worktrees: project.read(cx).worktree_metadata_protos(cx),
807 });
808 cx.spawn(|this, mut cx| async move {
809 let response = request.await?;
810
811 project.update(&mut cx, |project, cx| {
812 project.shared(response.project_id, cx)
813 })?;
814
815 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
816 this.update(&mut cx, |this, cx| {
817 this.shared_projects.insert(project.downgrade());
818 let active_project = this.local_participant.active_project.as_ref();
819 if active_project.map_or(false, |location| *location == project) {
820 this.set_location(Some(&project), cx)
821 } else {
822 Task::ready(Ok(()))
823 }
824 })
825 .await?;
826
827 Ok(response.project_id)
828 })
829 }
830
831 pub(crate) fn unshare_project(
832 &mut self,
833 project: ModelHandle<Project>,
834 cx: &mut ModelContext<Self>,
835 ) -> Result<()> {
836 let project_id = match project.read(cx).remote_id() {
837 Some(project_id) => project_id,
838 None => return Ok(()),
839 };
840
841 self.client.send(proto::UnshareProject { project_id })?;
842 project.update(cx, |this, cx| this.unshare(cx))
843 }
844
845 pub(crate) fn set_location(
846 &mut self,
847 project: Option<&ModelHandle<Project>>,
848 cx: &mut ModelContext<Self>,
849 ) -> Task<Result<()>> {
850 if self.status.is_offline() {
851 return Task::ready(Err(anyhow!("room is offline")));
852 }
853
854 let client = self.client.clone();
855 let room_id = self.id;
856 let location = if let Some(project) = project {
857 self.local_participant.active_project = Some(project.downgrade());
858 if let Some(project_id) = project.read(cx).remote_id() {
859 proto::participant_location::Variant::SharedProject(
860 proto::participant_location::SharedProject { id: project_id },
861 )
862 } else {
863 proto::participant_location::Variant::UnsharedProject(
864 proto::participant_location::UnsharedProject {},
865 )
866 }
867 } else {
868 self.local_participant.active_project = None;
869 proto::participant_location::Variant::External(proto::participant_location::External {})
870 };
871
872 cx.notify();
873 cx.foreground().spawn(async move {
874 client
875 .request(proto::UpdateParticipantLocation {
876 room_id,
877 location: Some(proto::ParticipantLocation {
878 variant: Some(location),
879 }),
880 })
881 .await?;
882 Ok(())
883 })
884 }
885
886 pub fn is_screen_sharing(&self) -> bool {
887 self.live_kit.as_ref().map_or(false, |live_kit| {
888 !matches!(live_kit.screen_track, ScreenTrack::None)
889 })
890 }
891
892 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
893 if self.status.is_offline() {
894 return Task::ready(Err(anyhow!("room is offline")));
895 } else if self.is_screen_sharing() {
896 return Task::ready(Err(anyhow!("screen was already shared")));
897 }
898
899 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
900 let publish_id = post_inc(&mut live_kit.next_publish_id);
901 live_kit.screen_track = ScreenTrack::Pending { publish_id };
902 cx.notify();
903 (live_kit.room.display_sources(), publish_id)
904 } else {
905 return Task::ready(Err(anyhow!("live-kit was not initialized")));
906 };
907
908 cx.spawn_weak(|this, mut cx| async move {
909 let publish_track = async {
910 let displays = displays.await?;
911 let display = displays
912 .first()
913 .ok_or_else(|| anyhow!("no display found"))?;
914 let track = LocalVideoTrack::screen_share_for_display(&display);
915 this.upgrade(&cx)
916 .ok_or_else(|| anyhow!("room was dropped"))?
917 .read_with(&cx, |this, _| {
918 this.live_kit
919 .as_ref()
920 .map(|live_kit| live_kit.room.publish_video_track(&track))
921 })
922 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
923 .await
924 };
925
926 let publication = publish_track.await;
927 this.upgrade(&cx)
928 .ok_or_else(|| anyhow!("room was dropped"))?
929 .update(&mut cx, |this, cx| {
930 let live_kit = this
931 .live_kit
932 .as_mut()
933 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
934
935 let canceled = if let ScreenTrack::Pending {
936 publish_id: cur_publish_id,
937 } = &live_kit.screen_track
938 {
939 *cur_publish_id != publish_id
940 } else {
941 true
942 };
943
944 match publication {
945 Ok(publication) => {
946 if canceled {
947 live_kit.room.unpublish_track(publication);
948 } else {
949 live_kit.screen_track = ScreenTrack::Published(publication);
950 cx.notify();
951 }
952 Ok(())
953 }
954 Err(error) => {
955 if canceled {
956 Ok(())
957 } else {
958 live_kit.screen_track = ScreenTrack::None;
959 cx.notify();
960 Err(error)
961 }
962 }
963 }
964 })
965 })
966 }
967
968 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
969 if self.status.is_offline() {
970 return Err(anyhow!("room is offline"));
971 }
972
973 let live_kit = self
974 .live_kit
975 .as_mut()
976 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
977 match mem::take(&mut live_kit.screen_track) {
978 ScreenTrack::None => Err(anyhow!("screen was not shared")),
979 ScreenTrack::Pending { .. } => {
980 cx.notify();
981 Ok(())
982 }
983 ScreenTrack::Published(track) => {
984 live_kit.room.unpublish_track(track);
985 cx.notify();
986 Ok(())
987 }
988 }
989 }
990
991 #[cfg(any(test, feature = "test-support"))]
992 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
993 self.live_kit
994 .as_ref()
995 .unwrap()
996 .room
997 .set_display_sources(sources);
998 }
999}
1000
1001struct LiveKitRoom {
1002 room: Arc<live_kit_client::Room>,
1003 screen_track: ScreenTrack,
1004 next_publish_id: usize,
1005 _maintain_room: Task<()>,
1006 _maintain_tracks: Task<()>,
1007}
1008
1009enum ScreenTrack {
1010 None,
1011 Pending { publish_id: usize },
1012 Published(LocalTrackPublication),
1013}
1014
1015impl Default for ScreenTrack {
1016 fn default() -> Self {
1017 Self::None
1018 }
1019}
1020
1021#[derive(Copy, Clone, PartialEq, Eq)]
1022pub enum RoomStatus {
1023 Online,
1024 Rejoining,
1025 Offline,
1026}
1027
1028impl RoomStatus {
1029 pub fn is_offline(&self) -> bool {
1030 matches!(self, RoomStatus::Offline)
1031 }
1032
1033 pub fn is_online(&self) -> bool {
1034 matches!(self, RoomStatus::Online)
1035 }
1036}