1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{
7 proto::{self, PeerId},
8 Client, TypedEnvelope, User, UserStore,
9};
10use collections::{BTreeMap, HashMap, HashSet};
11use fs::Fs;
12use futures::{FutureExt, StreamExt};
13use gpui::{
14 AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
15};
16use language::LanguageRegistry;
17use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
18use postage::stream::Stream;
19use project::Project;
20use std::{mem, sync::Arc, time::Duration};
21use util::{post_inc, ResultExt, TryFutureExt};
22
23pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub enum Event {
27 ParticipantLocationChanged {
28 participant_id: proto::PeerId,
29 },
30 RemoteVideoTracksChanged {
31 participant_id: proto::PeerId,
32 },
33 RemoteProjectShared {
34 owner: Arc<User>,
35 project_id: u64,
36 worktree_root_names: Vec<String>,
37 },
38 RemoteProjectUnshared {
39 project_id: u64,
40 },
41 Left,
42}
43
44pub struct Room {
45 id: u64,
46 live_kit: Option<LiveKitRoom>,
47 status: RoomStatus,
48 shared_projects: HashSet<WeakModelHandle<Project>>,
49 joined_projects: HashSet<WeakModelHandle<Project>>,
50 local_participant: LocalParticipant,
51 remote_participants: BTreeMap<u64, RemoteParticipant>,
52 pending_participants: Vec<Arc<User>>,
53 participant_user_ids: HashSet<u64>,
54 pending_call_count: usize,
55 leave_when_empty: bool,
56 client: Arc<Client>,
57 user_store: ModelHandle<UserStore>,
58 subscriptions: Vec<client::Subscription>,
59 pending_room_update: Option<Task<()>>,
60 maintain_connection: Option<Task<Option<()>>>,
61}
62
63impl Entity for Room {
64 type Event = Event;
65
66 fn release(&mut self, _: &mut MutableAppContext) {
67 if self.status.is_online() {
68 log::info!("room was released, sending leave message");
69 let _ = self.client.send(proto::LeaveRoom {});
70 }
71 }
72}
73
74impl Room {
75 fn new(
76 id: u64,
77 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
78 client: Arc<Client>,
79 user_store: ModelHandle<UserStore>,
80 cx: &mut ModelContext<Self>,
81 ) -> Self {
82 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
83 let room = live_kit_client::Room::new();
84 let mut status = room.status();
85 // Consume the initial status of the room.
86 let _ = status.try_recv();
87 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
88 while let Some(status) = status.next().await {
89 let this = if let Some(this) = this.upgrade(&cx) {
90 this
91 } else {
92 break;
93 };
94
95 if status == live_kit_client::ConnectionState::Disconnected {
96 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
97 break;
98 }
99 }
100 });
101
102 let mut track_changes = room.remote_video_track_updates();
103 let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
104 while let Some(track_change) = track_changes.next().await {
105 let this = if let Some(this) = this.upgrade(&cx) {
106 this
107 } else {
108 break;
109 };
110
111 this.update(&mut cx, |this, cx| {
112 this.remote_video_track_updated(track_change, cx).log_err()
113 });
114 }
115 });
116
117 cx.foreground()
118 .spawn(room.connect(&connection_info.server_url, &connection_info.token))
119 .detach_and_log_err(cx);
120
121 Some(LiveKitRoom {
122 room,
123 screen_track: ScreenTrack::None,
124 next_publish_id: 0,
125 _maintain_room,
126 _maintain_tracks,
127 })
128 } else {
129 None
130 };
131
132 let maintain_connection =
133 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
134
135 Self {
136 id,
137 live_kit: live_kit_room,
138 status: RoomStatus::Online,
139 shared_projects: Default::default(),
140 joined_projects: Default::default(),
141 participant_user_ids: Default::default(),
142 local_participant: Default::default(),
143 remote_participants: Default::default(),
144 pending_participants: Default::default(),
145 pending_call_count: 0,
146 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
147 leave_when_empty: false,
148 pending_room_update: None,
149 client,
150 user_store,
151 maintain_connection: Some(maintain_connection),
152 }
153 }
154
155 pub(crate) fn create(
156 called_user_id: u64,
157 initial_project: Option<ModelHandle<Project>>,
158 client: Arc<Client>,
159 user_store: ModelHandle<UserStore>,
160 cx: &mut MutableAppContext,
161 ) -> Task<Result<ModelHandle<Self>>> {
162 cx.spawn(|mut cx| async move {
163 let response = client.request(proto::CreateRoom {}).await?;
164 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
165 let room = cx.add_model(|cx| {
166 Self::new(
167 room_proto.id,
168 response.live_kit_connection_info,
169 client,
170 user_store,
171 cx,
172 )
173 });
174
175 let initial_project_id = if let Some(initial_project) = initial_project {
176 let initial_project_id = room
177 .update(&mut cx, |room, cx| {
178 room.share_project(initial_project.clone(), cx)
179 })
180 .await?;
181 Some(initial_project_id)
182 } else {
183 None
184 };
185
186 match room
187 .update(&mut cx, |room, cx| {
188 room.leave_when_empty = true;
189 room.call(called_user_id, initial_project_id, cx)
190 })
191 .await
192 {
193 Ok(()) => Ok(room),
194 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
195 }
196 })
197 }
198
199 pub(crate) fn join(
200 call: &IncomingCall,
201 client: Arc<Client>,
202 user_store: ModelHandle<UserStore>,
203 cx: &mut MutableAppContext,
204 ) -> Task<Result<ModelHandle<Self>>> {
205 let room_id = call.room_id;
206 cx.spawn(|mut cx| async move {
207 let response = client.request(proto::JoinRoom { id: room_id }).await?;
208 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
209 let room = cx.add_model(|cx| {
210 Self::new(
211 room_id,
212 response.live_kit_connection_info,
213 client,
214 user_store,
215 cx,
216 )
217 });
218 room.update(&mut cx, |room, cx| {
219 room.leave_when_empty = true;
220 room.apply_room_update(room_proto, cx)?;
221 anyhow::Ok(())
222 })?;
223 Ok(room)
224 })
225 }
226
227 fn should_leave(&self) -> bool {
228 self.leave_when_empty
229 && self.pending_room_update.is_none()
230 && self.pending_participants.is_empty()
231 && self.remote_participants.is_empty()
232 && self.pending_call_count == 0
233 }
234
235 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
236 if self.status.is_offline() {
237 return Err(anyhow!("room is offline"));
238 }
239
240 cx.notify();
241 cx.emit(Event::Left);
242 log::info!("leaving room");
243
244 for project in self.shared_projects.drain() {
245 if let Some(project) = project.upgrade(cx) {
246 project.update(cx, |project, cx| {
247 project.unshare(cx).log_err();
248 });
249 }
250 }
251 for project in self.joined_projects.drain() {
252 if let Some(project) = project.upgrade(cx) {
253 project.update(cx, |project, cx| {
254 project.disconnected_from_host(cx);
255 });
256 }
257 }
258
259 self.status = RoomStatus::Offline;
260 self.remote_participants.clear();
261 self.pending_participants.clear();
262 self.participant_user_ids.clear();
263 self.subscriptions.clear();
264 self.live_kit.take();
265 self.pending_room_update.take();
266 self.maintain_connection.take();
267 self.client.send(proto::LeaveRoom {})?;
268 Ok(())
269 }
270
271 async fn maintain_connection(
272 this: WeakModelHandle<Self>,
273 client: Arc<Client>,
274 mut cx: AsyncAppContext,
275 ) -> Result<()> {
276 let mut client_status = client.status();
277 loop {
278 let is_connected = client_status
279 .next()
280 .await
281 .map_or(false, |s| s.is_connected());
282
283 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
284 if !is_connected || client_status.next().await.is_some() {
285 log::info!("detected client disconnection");
286 this.upgrade(&cx)
287 .ok_or_else(|| anyhow!("room was dropped"))?
288 .update(&mut cx, |this, cx| {
289 this.status = RoomStatus::Rejoining;
290 cx.notify();
291 });
292
293 // Wait for client to re-establish a connection to the server.
294 {
295 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
296 let client_reconnection = async {
297 let mut remaining_attempts = 3;
298 while remaining_attempts > 0 {
299 log::info!(
300 "waiting for client status change, remaining attempts {}",
301 remaining_attempts
302 );
303 let Some(status) = client_status.next().await else { break };
304 if status.is_connected() {
305 log::info!("client reconnected, attempting to rejoin room");
306
307 let Some(this) = this.upgrade(&cx) else { break };
308 if this
309 .update(&mut cx, |this, cx| this.rejoin(cx))
310 .await
311 .log_err()
312 .is_some()
313 {
314 return true;
315 } else {
316 remaining_attempts -= 1;
317 }
318 }
319 }
320 false
321 }
322 .fuse();
323 futures::pin_mut!(client_reconnection);
324
325 futures::select_biased! {
326 reconnected = client_reconnection => {
327 if reconnected {
328 log::info!("successfully reconnected to room");
329 // If we successfully joined the room, go back around the loop
330 // waiting for future connection status changes.
331 continue;
332 }
333 }
334 _ = reconnection_timeout => {
335 log::info!("room reconnection timeout expired");
336 }
337 }
338 }
339
340 // The client failed to re-establish a connection to the server
341 // or an error occurred while trying to re-join the room. Either way
342 // we leave the room and return an error.
343 if let Some(this) = this.upgrade(&cx) {
344 log::info!("reconnection failed, leaving room");
345 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
346 }
347 return Err(anyhow!(
348 "can't reconnect to room: client failed to re-establish connection"
349 ));
350 }
351 }
352 }
353
354 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
355 let mut projects = HashMap::default();
356 let mut reshared_projects = Vec::new();
357 let mut rejoined_projects = Vec::new();
358 self.shared_projects.retain(|project| {
359 if let Some(handle) = project.upgrade(cx) {
360 let project = handle.read(cx);
361 if let Some(project_id) = project.remote_id() {
362 projects.insert(project_id, handle.clone());
363 reshared_projects.push(proto::UpdateProject {
364 project_id,
365 worktrees: project.worktree_metadata_protos(cx),
366 });
367 return true;
368 }
369 }
370 false
371 });
372 self.joined_projects.retain(|project| {
373 if let Some(handle) = project.upgrade(cx) {
374 let project = handle.read(cx);
375 if let Some(project_id) = project.remote_id() {
376 projects.insert(project_id, handle.clone());
377 rejoined_projects.push(proto::RejoinProject {
378 id: project_id,
379 worktrees: project
380 .worktrees(cx)
381 .map(|worktree| {
382 let worktree = worktree.read(cx);
383 proto::RejoinWorktree {
384 id: worktree.id().to_proto(),
385 scan_id: worktree.completed_scan_id() as u64,
386 }
387 })
388 .collect(),
389 });
390 }
391 return true;
392 }
393 false
394 });
395
396 let response = self.client.request(proto::RejoinRoom {
397 id: self.id,
398 reshared_projects,
399 rejoined_projects,
400 });
401
402 cx.spawn(|this, mut cx| async move {
403 let response = response.await?;
404 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
405 this.update(&mut cx, |this, cx| {
406 this.status = RoomStatus::Online;
407 this.apply_room_update(room_proto, cx)?;
408
409 for reshared_project in response.reshared_projects {
410 if let Some(project) = projects.get(&reshared_project.id) {
411 project.update(cx, |project, cx| {
412 project.reshared(reshared_project, cx).log_err();
413 });
414 }
415 }
416
417 for rejoined_project in response.rejoined_projects {
418 if let Some(project) = projects.get(&rejoined_project.id) {
419 project.update(cx, |project, cx| {
420 project.rejoined(rejoined_project, cx).log_err();
421 });
422 }
423 }
424
425 anyhow::Ok(())
426 })
427 })
428 }
429
430 pub fn id(&self) -> u64 {
431 self.id
432 }
433
434 pub fn status(&self) -> RoomStatus {
435 self.status
436 }
437
438 pub fn local_participant(&self) -> &LocalParticipant {
439 &self.local_participant
440 }
441
442 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
443 &self.remote_participants
444 }
445
446 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
447 self.remote_participants
448 .values()
449 .find(|p| p.peer_id == peer_id)
450 }
451
452 pub fn pending_participants(&self) -> &[Arc<User>] {
453 &self.pending_participants
454 }
455
456 pub fn contains_participant(&self, user_id: u64) -> bool {
457 self.participant_user_ids.contains(&user_id)
458 }
459
460 async fn handle_room_updated(
461 this: ModelHandle<Self>,
462 envelope: TypedEnvelope<proto::RoomUpdated>,
463 _: Arc<Client>,
464 mut cx: AsyncAppContext,
465 ) -> Result<()> {
466 let room = envelope
467 .payload
468 .room
469 .ok_or_else(|| anyhow!("invalid room"))?;
470 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
471 }
472
473 fn apply_room_update(
474 &mut self,
475 mut room: proto::Room,
476 cx: &mut ModelContext<Self>,
477 ) -> Result<()> {
478 // Filter ourselves out from the room's participants.
479 let local_participant_ix = room
480 .participants
481 .iter()
482 .position(|participant| Some(participant.user_id) == self.client.user_id());
483 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
484
485 let pending_participant_user_ids = room
486 .pending_participants
487 .iter()
488 .map(|p| p.user_id)
489 .collect::<Vec<_>>();
490 let remote_participant_user_ids = room
491 .participants
492 .iter()
493 .map(|p| p.user_id)
494 .collect::<Vec<_>>();
495 let (remote_participants, pending_participants) =
496 self.user_store.update(cx, move |user_store, cx| {
497 (
498 user_store.get_users(remote_participant_user_ids, cx),
499 user_store.get_users(pending_participant_user_ids, cx),
500 )
501 });
502 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
503 let (remote_participants, pending_participants) =
504 futures::join!(remote_participants, pending_participants);
505
506 this.update(&mut cx, |this, cx| {
507 this.participant_user_ids.clear();
508
509 if let Some(participant) = local_participant {
510 this.local_participant.projects = participant.projects;
511 } else {
512 this.local_participant.projects.clear();
513 }
514
515 if let Some(participants) = remote_participants.log_err() {
516 for (participant, user) in room.participants.into_iter().zip(participants) {
517 let Some(peer_id) = participant.peer_id else { continue };
518 this.participant_user_ids.insert(participant.user_id);
519
520 let old_projects = this
521 .remote_participants
522 .get(&participant.user_id)
523 .into_iter()
524 .flat_map(|existing| &existing.projects)
525 .map(|project| project.id)
526 .collect::<HashSet<_>>();
527 let new_projects = participant
528 .projects
529 .iter()
530 .map(|project| project.id)
531 .collect::<HashSet<_>>();
532
533 for project in &participant.projects {
534 if !old_projects.contains(&project.id) {
535 cx.emit(Event::RemoteProjectShared {
536 owner: user.clone(),
537 project_id: project.id,
538 worktree_root_names: project.worktree_root_names.clone(),
539 });
540 }
541 }
542
543 for unshared_project_id in old_projects.difference(&new_projects) {
544 this.joined_projects.retain(|project| {
545 if let Some(project) = project.upgrade(cx) {
546 project.update(cx, |project, cx| {
547 if project.remote_id() == Some(*unshared_project_id) {
548 project.disconnected_from_host(cx);
549 false
550 } else {
551 true
552 }
553 })
554 } else {
555 false
556 }
557 });
558 cx.emit(Event::RemoteProjectUnshared {
559 project_id: *unshared_project_id,
560 });
561 }
562
563 let location = ParticipantLocation::from_proto(participant.location)
564 .unwrap_or(ParticipantLocation::External);
565 if let Some(remote_participant) =
566 this.remote_participants.get_mut(&participant.user_id)
567 {
568 remote_participant.projects = participant.projects;
569 remote_participant.peer_id = peer_id;
570 if location != remote_participant.location {
571 remote_participant.location = location;
572 cx.emit(Event::ParticipantLocationChanged {
573 participant_id: peer_id,
574 });
575 }
576 } else {
577 this.remote_participants.insert(
578 participant.user_id,
579 RemoteParticipant {
580 user: user.clone(),
581 peer_id,
582 projects: participant.projects,
583 location,
584 tracks: Default::default(),
585 },
586 );
587
588 if let Some(live_kit) = this.live_kit.as_ref() {
589 let tracks =
590 live_kit.room.remote_video_tracks(&peer_id.to_string());
591 for track in tracks {
592 this.remote_video_track_updated(
593 RemoteVideoTrackUpdate::Subscribed(track),
594 cx,
595 )
596 .log_err();
597 }
598 }
599 }
600 }
601
602 this.remote_participants.retain(|user_id, participant| {
603 if this.participant_user_ids.contains(user_id) {
604 true
605 } else {
606 for project in &participant.projects {
607 cx.emit(Event::RemoteProjectUnshared {
608 project_id: project.id,
609 });
610 }
611 false
612 }
613 });
614 }
615
616 if let Some(pending_participants) = pending_participants.log_err() {
617 this.pending_participants = pending_participants;
618 for participant in &this.pending_participants {
619 this.participant_user_ids.insert(participant.id);
620 }
621 }
622
623 this.pending_room_update.take();
624 if this.should_leave() {
625 log::info!("room is empty, leaving");
626 let _ = this.leave(cx);
627 }
628
629 this.check_invariants();
630 cx.notify();
631 });
632 }));
633
634 cx.notify();
635 Ok(())
636 }
637
638 fn remote_video_track_updated(
639 &mut self,
640 change: RemoteVideoTrackUpdate,
641 cx: &mut ModelContext<Self>,
642 ) -> Result<()> {
643 match change {
644 RemoteVideoTrackUpdate::Subscribed(track) => {
645 let user_id = track.publisher_id().parse()?;
646 let track_id = track.sid().to_string();
647 let participant = self
648 .remote_participants
649 .get_mut(&user_id)
650 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
651 participant.tracks.insert(
652 track_id.clone(),
653 Arc::new(RemoteVideoTrack {
654 live_kit_track: track,
655 }),
656 );
657 cx.emit(Event::RemoteVideoTracksChanged {
658 participant_id: participant.peer_id,
659 });
660 }
661 RemoteVideoTrackUpdate::Unsubscribed {
662 publisher_id,
663 track_id,
664 } => {
665 let user_id = publisher_id.parse()?;
666 let participant = self
667 .remote_participants
668 .get_mut(&user_id)
669 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
670 participant.tracks.remove(&track_id);
671 cx.emit(Event::RemoteVideoTracksChanged {
672 participant_id: participant.peer_id,
673 });
674 }
675 }
676
677 cx.notify();
678 Ok(())
679 }
680
681 fn check_invariants(&self) {
682 #[cfg(any(test, feature = "test-support"))]
683 {
684 for participant in self.remote_participants.values() {
685 assert!(self.participant_user_ids.contains(&participant.user.id));
686 assert_ne!(participant.user.id, self.client.user_id().unwrap());
687 }
688
689 for participant in &self.pending_participants {
690 assert!(self.participant_user_ids.contains(&participant.id));
691 assert_ne!(participant.id, self.client.user_id().unwrap());
692 }
693
694 assert_eq!(
695 self.participant_user_ids.len(),
696 self.remote_participants.len() + self.pending_participants.len()
697 );
698 }
699 }
700
701 pub(crate) fn call(
702 &mut self,
703 called_user_id: u64,
704 initial_project_id: Option<u64>,
705 cx: &mut ModelContext<Self>,
706 ) -> Task<Result<()>> {
707 if self.status.is_offline() {
708 return Task::ready(Err(anyhow!("room is offline")));
709 }
710
711 cx.notify();
712 let client = self.client.clone();
713 let room_id = self.id;
714 self.pending_call_count += 1;
715 cx.spawn(|this, mut cx| async move {
716 let result = client
717 .request(proto::Call {
718 room_id,
719 called_user_id,
720 initial_project_id,
721 })
722 .await;
723 this.update(&mut cx, |this, cx| {
724 this.pending_call_count -= 1;
725 if this.should_leave() {
726 this.leave(cx)?;
727 }
728 result
729 })?;
730 Ok(())
731 })
732 }
733
734 pub fn join_project(
735 &mut self,
736 id: u64,
737 language_registry: Arc<LanguageRegistry>,
738 fs: Arc<dyn Fs>,
739 cx: &mut ModelContext<Self>,
740 ) -> Task<Result<ModelHandle<Project>>> {
741 let client = self.client.clone();
742 let user_store = self.user_store.clone();
743 cx.spawn(|this, mut cx| async move {
744 let project =
745 Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
746 this.update(&mut cx, |this, cx| {
747 this.joined_projects.retain(|project| {
748 if let Some(project) = project.upgrade(cx) {
749 !project.read(cx).is_read_only()
750 } else {
751 false
752 }
753 });
754 this.joined_projects.insert(project.downgrade());
755 });
756 Ok(project)
757 })
758 }
759
760 pub(crate) fn share_project(
761 &mut self,
762 project: ModelHandle<Project>,
763 cx: &mut ModelContext<Self>,
764 ) -> Task<Result<u64>> {
765 if let Some(project_id) = project.read(cx).remote_id() {
766 return Task::ready(Ok(project_id));
767 }
768
769 let request = self.client.request(proto::ShareProject {
770 room_id: self.id(),
771 worktrees: project.read(cx).worktree_metadata_protos(cx),
772 });
773 cx.spawn(|this, mut cx| async move {
774 let response = request.await?;
775
776 project.update(&mut cx, |project, cx| {
777 project.shared(response.project_id, cx)
778 })?;
779
780 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
781 this.update(&mut cx, |this, cx| {
782 this.shared_projects.insert(project.downgrade());
783 let active_project = this.local_participant.active_project.as_ref();
784 if active_project.map_or(false, |location| *location == project) {
785 this.set_location(Some(&project), cx)
786 } else {
787 Task::ready(Ok(()))
788 }
789 })
790 .await?;
791
792 Ok(response.project_id)
793 })
794 }
795
796 pub(crate) fn set_location(
797 &mut self,
798 project: Option<&ModelHandle<Project>>,
799 cx: &mut ModelContext<Self>,
800 ) -> Task<Result<()>> {
801 if self.status.is_offline() {
802 return Task::ready(Err(anyhow!("room is offline")));
803 }
804
805 let client = self.client.clone();
806 let room_id = self.id;
807 let location = if let Some(project) = project {
808 self.local_participant.active_project = Some(project.downgrade());
809 if let Some(project_id) = project.read(cx).remote_id() {
810 proto::participant_location::Variant::SharedProject(
811 proto::participant_location::SharedProject { id: project_id },
812 )
813 } else {
814 proto::participant_location::Variant::UnsharedProject(
815 proto::participant_location::UnsharedProject {},
816 )
817 }
818 } else {
819 self.local_participant.active_project = None;
820 proto::participant_location::Variant::External(proto::participant_location::External {})
821 };
822
823 cx.notify();
824 cx.foreground().spawn(async move {
825 client
826 .request(proto::UpdateParticipantLocation {
827 room_id,
828 location: Some(proto::ParticipantLocation {
829 variant: Some(location),
830 }),
831 })
832 .await?;
833 Ok(())
834 })
835 }
836
837 pub fn is_screen_sharing(&self) -> bool {
838 self.live_kit.as_ref().map_or(false, |live_kit| {
839 !matches!(live_kit.screen_track, ScreenTrack::None)
840 })
841 }
842
843 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
844 if self.status.is_offline() {
845 return Task::ready(Err(anyhow!("room is offline")));
846 } else if self.is_screen_sharing() {
847 return Task::ready(Err(anyhow!("screen was already shared")));
848 }
849
850 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
851 let publish_id = post_inc(&mut live_kit.next_publish_id);
852 live_kit.screen_track = ScreenTrack::Pending { publish_id };
853 cx.notify();
854 (live_kit.room.display_sources(), publish_id)
855 } else {
856 return Task::ready(Err(anyhow!("live-kit was not initialized")));
857 };
858
859 cx.spawn_weak(|this, mut cx| async move {
860 let publish_track = async {
861 let displays = displays.await?;
862 let display = displays
863 .first()
864 .ok_or_else(|| anyhow!("no display found"))?;
865 let track = LocalVideoTrack::screen_share_for_display(&display);
866 this.upgrade(&cx)
867 .ok_or_else(|| anyhow!("room was dropped"))?
868 .read_with(&cx, |this, _| {
869 this.live_kit
870 .as_ref()
871 .map(|live_kit| live_kit.room.publish_video_track(&track))
872 })
873 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
874 .await
875 };
876
877 let publication = publish_track.await;
878 this.upgrade(&cx)
879 .ok_or_else(|| anyhow!("room was dropped"))?
880 .update(&mut cx, |this, cx| {
881 let live_kit = this
882 .live_kit
883 .as_mut()
884 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
885
886 let canceled = if let ScreenTrack::Pending {
887 publish_id: cur_publish_id,
888 } = &live_kit.screen_track
889 {
890 *cur_publish_id != publish_id
891 } else {
892 true
893 };
894
895 match publication {
896 Ok(publication) => {
897 if canceled {
898 live_kit.room.unpublish_track(publication);
899 } else {
900 live_kit.screen_track = ScreenTrack::Published(publication);
901 cx.notify();
902 }
903 Ok(())
904 }
905 Err(error) => {
906 if canceled {
907 Ok(())
908 } else {
909 live_kit.screen_track = ScreenTrack::None;
910 cx.notify();
911 Err(error)
912 }
913 }
914 }
915 })
916 })
917 }
918
919 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
920 if self.status.is_offline() {
921 return Err(anyhow!("room is offline"));
922 }
923
924 let live_kit = self
925 .live_kit
926 .as_mut()
927 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
928 match mem::take(&mut live_kit.screen_track) {
929 ScreenTrack::None => Err(anyhow!("screen was not shared")),
930 ScreenTrack::Pending { .. } => {
931 cx.notify();
932 Ok(())
933 }
934 ScreenTrack::Published(track) => {
935 live_kit.room.unpublish_track(track);
936 cx.notify();
937 Ok(())
938 }
939 }
940 }
941
942 #[cfg(any(test, feature = "test-support"))]
943 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
944 self.live_kit
945 .as_ref()
946 .unwrap()
947 .room
948 .set_display_sources(sources);
949 }
950}
951
952struct LiveKitRoom {
953 room: Arc<live_kit_client::Room>,
954 screen_track: ScreenTrack,
955 next_publish_id: usize,
956 _maintain_room: Task<()>,
957 _maintain_tracks: Task<()>,
958}
959
960enum ScreenTrack {
961 None,
962 Pending { publish_id: usize },
963 Published(LocalTrackPublication),
964}
965
966impl Default for ScreenTrack {
967 fn default() -> Self {
968 Self::None
969 }
970}
971
972#[derive(Copy, Clone, PartialEq, Eq)]
973pub enum RoomStatus {
974 Online,
975 Rejoining,
976 Offline,
977}
978
979impl RoomStatus {
980 pub fn is_offline(&self) -> bool {
981 matches!(self, RoomStatus::Offline)
982 }
983
984 pub fn is_online(&self) -> bool {
985 matches!(self, RoomStatus::Online)
986 }
987}