1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{
7 proto::{self, PeerId},
8 Client, TypedEnvelope, User, UserStore,
9};
10use collections::{BTreeMap, HashSet};
11use futures::{FutureExt, StreamExt};
12use gpui::{
13 AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
14};
15use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
16use postage::stream::Stream;
17use project::Project;
18use std::{mem, sync::Arc, time::Duration};
19use util::{post_inc, ResultExt, TryFutureExt};
20
21pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
22
23#[derive(Clone, Debug, PartialEq, Eq)]
24pub enum Event {
25 ParticipantLocationChanged {
26 participant_id: proto::PeerId,
27 },
28 RemoteVideoTracksChanged {
29 participant_id: proto::PeerId,
30 },
31 RemoteProjectShared {
32 owner: Arc<User>,
33 project_id: u64,
34 worktree_root_names: Vec<String>,
35 },
36 RemoteProjectUnshared {
37 project_id: u64,
38 },
39 Left,
40}
41
42pub struct Room {
43 id: u64,
44 live_kit: Option<LiveKitRoom>,
45 status: RoomStatus,
46 local_participant: LocalParticipant,
47 remote_participants: BTreeMap<u64, RemoteParticipant>,
48 pending_participants: Vec<Arc<User>>,
49 participant_user_ids: HashSet<u64>,
50 pending_call_count: usize,
51 leave_when_empty: bool,
52 client: Arc<Client>,
53 user_store: ModelHandle<UserStore>,
54 subscriptions: Vec<client::Subscription>,
55 pending_room_update: Option<Task<()>>,
56 maintain_connection: Option<Task<Option<()>>>,
57}
58
59impl Entity for Room {
60 type Event = Event;
61
62 fn release(&mut self, _: &mut MutableAppContext) {
63 if self.status.is_online() {
64 log::info!("room was released, sending leave message");
65 self.client.send(proto::LeaveRoom {}).log_err();
66 }
67 }
68}
69
70impl Room {
71 fn new(
72 id: u64,
73 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
74 client: Arc<Client>,
75 user_store: ModelHandle<UserStore>,
76 cx: &mut ModelContext<Self>,
77 ) -> Self {
78 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
79 let room = live_kit_client::Room::new();
80 let mut status = room.status();
81 // Consume the initial status of the room.
82 let _ = status.try_recv();
83 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
84 while let Some(status) = status.next().await {
85 let this = if let Some(this) = this.upgrade(&cx) {
86 this
87 } else {
88 break;
89 };
90
91 if status == live_kit_client::ConnectionState::Disconnected {
92 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
93 break;
94 }
95 }
96 });
97
98 let mut track_changes = room.remote_video_track_updates();
99 let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
100 while let Some(track_change) = track_changes.next().await {
101 let this = if let Some(this) = this.upgrade(&cx) {
102 this
103 } else {
104 break;
105 };
106
107 this.update(&mut cx, |this, cx| {
108 this.remote_video_track_updated(track_change, cx).log_err()
109 });
110 }
111 });
112
113 cx.foreground()
114 .spawn(room.connect(&connection_info.server_url, &connection_info.token))
115 .detach_and_log_err(cx);
116
117 Some(LiveKitRoom {
118 room,
119 screen_track: ScreenTrack::None,
120 next_publish_id: 0,
121 _maintain_room,
122 _maintain_tracks,
123 })
124 } else {
125 None
126 };
127
128 let maintain_connection =
129 cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
130
131 Self {
132 id,
133 live_kit: live_kit_room,
134 status: RoomStatus::Online,
135 participant_user_ids: Default::default(),
136 local_participant: Default::default(),
137 remote_participants: Default::default(),
138 pending_participants: Default::default(),
139 pending_call_count: 0,
140 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
141 leave_when_empty: false,
142 pending_room_update: None,
143 client,
144 user_store,
145 maintain_connection: Some(maintain_connection),
146 }
147 }
148
149 pub(crate) fn create(
150 called_user_id: u64,
151 initial_project: Option<ModelHandle<Project>>,
152 client: Arc<Client>,
153 user_store: ModelHandle<UserStore>,
154 cx: &mut MutableAppContext,
155 ) -> Task<Result<ModelHandle<Self>>> {
156 cx.spawn(|mut cx| async move {
157 let response = client.request(proto::CreateRoom {}).await?;
158 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
159 let room = cx.add_model(|cx| {
160 Self::new(
161 room_proto.id,
162 response.live_kit_connection_info,
163 client,
164 user_store,
165 cx,
166 )
167 });
168
169 let initial_project_id = if let Some(initial_project) = initial_project {
170 let initial_project_id = room
171 .update(&mut cx, |room, cx| {
172 room.share_project(initial_project.clone(), cx)
173 })
174 .await?;
175 Some(initial_project_id)
176 } else {
177 None
178 };
179
180 match room
181 .update(&mut cx, |room, cx| {
182 room.leave_when_empty = true;
183 room.call(called_user_id, initial_project_id, cx)
184 })
185 .await
186 {
187 Ok(()) => Ok(room),
188 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
189 }
190 })
191 }
192
193 pub(crate) fn join(
194 call: &IncomingCall,
195 client: Arc<Client>,
196 user_store: ModelHandle<UserStore>,
197 cx: &mut MutableAppContext,
198 ) -> Task<Result<ModelHandle<Self>>> {
199 let room_id = call.room_id;
200 cx.spawn(|mut cx| async move {
201 let response = client.request(proto::JoinRoom { id: room_id }).await?;
202 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
203 let room = cx.add_model(|cx| {
204 Self::new(
205 room_id,
206 response.live_kit_connection_info,
207 client,
208 user_store,
209 cx,
210 )
211 });
212 room.update(&mut cx, |room, cx| {
213 room.leave_when_empty = true;
214 room.apply_room_update(room_proto, cx)?;
215 anyhow::Ok(())
216 })?;
217 Ok(room)
218 })
219 }
220
221 fn should_leave(&self) -> bool {
222 self.leave_when_empty
223 && self.pending_room_update.is_none()
224 && self.pending_participants.is_empty()
225 && self.remote_participants.is_empty()
226 && self.pending_call_count == 0
227 }
228
229 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
230 if self.status.is_offline() {
231 return Err(anyhow!("room is offline"));
232 }
233
234 cx.notify();
235 cx.emit(Event::Left);
236 log::info!("leaving room");
237 self.status = RoomStatus::Offline;
238 self.remote_participants.clear();
239 self.pending_participants.clear();
240 self.participant_user_ids.clear();
241 self.subscriptions.clear();
242 self.live_kit.take();
243 self.pending_room_update.take();
244 self.maintain_connection.take();
245 self.client.send(proto::LeaveRoom {})?;
246 Ok(())
247 }
248
249 async fn maintain_connection(
250 this: WeakModelHandle<Self>,
251 client: Arc<Client>,
252 mut cx: AsyncAppContext,
253 ) -> Result<()> {
254 let mut client_status = client.status();
255 loop {
256 let is_connected = client_status
257 .next()
258 .await
259 .map_or(false, |s| s.is_connected());
260 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
261 if !is_connected || client_status.next().await.is_some() {
262 log::info!("detected client disconnection");
263 let room_id = this
264 .upgrade(&cx)
265 .ok_or_else(|| anyhow!("room was dropped"))?
266 .update(&mut cx, |this, cx| {
267 this.status = RoomStatus::Rejoining;
268 cx.notify();
269 this.id
270 });
271
272 // Wait for client to re-establish a connection to the server.
273 {
274 let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
275 let client_reconnection = async {
276 let mut remaining_attempts = 3;
277 while remaining_attempts > 0 {
278 log::info!(
279 "waiting for client status change, remaining attempts {}",
280 remaining_attempts
281 );
282 if let Some(status) = client_status.next().await {
283 if status.is_connected() {
284 log::info!("client reconnected, attempting to rejoin room");
285 let rejoin_room = async {
286 let response =
287 client.request(proto::JoinRoom { id: room_id }).await?;
288 let room_proto =
289 response.room.ok_or_else(|| anyhow!("invalid room"))?;
290 this.upgrade(&cx)
291 .ok_or_else(|| anyhow!("room was dropped"))?
292 .update(&mut cx, |this, cx| {
293 this.status = RoomStatus::Online;
294 this.apply_room_update(room_proto, cx)
295 })?;
296 anyhow::Ok(())
297 };
298
299 if rejoin_room.await.log_err().is_some() {
300 return true;
301 } else {
302 remaining_attempts -= 1;
303 }
304 }
305 } else {
306 return false;
307 }
308 }
309 false
310 }
311 .fuse();
312 futures::pin_mut!(client_reconnection);
313
314 futures::select_biased! {
315 reconnected = client_reconnection => {
316 if reconnected {
317 log::info!("successfully reconnected to room");
318 // If we successfully joined the room, go back around the loop
319 // waiting for future connection status changes.
320 continue;
321 }
322 }
323 _ = reconnection_timeout => {
324 log::info!("room reconnection timeout expired");
325 }
326 }
327 }
328
329 // The client failed to re-establish a connection to the server
330 // or an error occurred while trying to re-join the room. Either way
331 // we leave the room and return an error.
332 if let Some(this) = this.upgrade(&cx) {
333 log::info!("reconnection failed, leaving room");
334 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
335 }
336 return Err(anyhow!(
337 "can't reconnect to room: client failed to re-establish connection"
338 ));
339 }
340 }
341 }
342
343 pub fn id(&self) -> u64 {
344 self.id
345 }
346
347 pub fn status(&self) -> RoomStatus {
348 self.status
349 }
350
351 pub fn local_participant(&self) -> &LocalParticipant {
352 &self.local_participant
353 }
354
355 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
356 &self.remote_participants
357 }
358
359 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
360 self.remote_participants
361 .values()
362 .find(|p| p.peer_id == peer_id)
363 }
364
365 pub fn pending_participants(&self) -> &[Arc<User>] {
366 &self.pending_participants
367 }
368
369 pub fn contains_participant(&self, user_id: u64) -> bool {
370 self.participant_user_ids.contains(&user_id)
371 }
372
373 async fn handle_room_updated(
374 this: ModelHandle<Self>,
375 envelope: TypedEnvelope<proto::RoomUpdated>,
376 _: Arc<Client>,
377 mut cx: AsyncAppContext,
378 ) -> Result<()> {
379 let room = envelope
380 .payload
381 .room
382 .ok_or_else(|| anyhow!("invalid room"))?;
383 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
384 }
385
386 fn apply_room_update(
387 &mut self,
388 mut room: proto::Room,
389 cx: &mut ModelContext<Self>,
390 ) -> Result<()> {
391 // Filter ourselves out from the room's participants.
392 let local_participant_ix = room
393 .participants
394 .iter()
395 .position(|participant| Some(participant.user_id) == self.client.user_id());
396 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
397
398 let pending_participant_user_ids = room
399 .pending_participants
400 .iter()
401 .map(|p| p.user_id)
402 .collect::<Vec<_>>();
403 let remote_participant_user_ids = room
404 .participants
405 .iter()
406 .map(|p| p.user_id)
407 .collect::<Vec<_>>();
408 let (remote_participants, pending_participants) =
409 self.user_store.update(cx, move |user_store, cx| {
410 (
411 user_store.get_users(remote_participant_user_ids, cx),
412 user_store.get_users(pending_participant_user_ids, cx),
413 )
414 });
415 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
416 let (remote_participants, pending_participants) =
417 futures::join!(remote_participants, pending_participants);
418
419 this.update(&mut cx, |this, cx| {
420 this.participant_user_ids.clear();
421
422 if let Some(participant) = local_participant {
423 this.local_participant.projects = participant.projects;
424 } else {
425 this.local_participant.projects.clear();
426 }
427
428 if let Some(participants) = remote_participants.log_err() {
429 for (participant, user) in room.participants.into_iter().zip(participants) {
430 let Some(peer_id) = participant.peer_id else { continue };
431 this.participant_user_ids.insert(participant.user_id);
432
433 let old_projects = this
434 .remote_participants
435 .get(&participant.user_id)
436 .into_iter()
437 .flat_map(|existing| &existing.projects)
438 .map(|project| project.id)
439 .collect::<HashSet<_>>();
440 let new_projects = participant
441 .projects
442 .iter()
443 .map(|project| project.id)
444 .collect::<HashSet<_>>();
445
446 for project in &participant.projects {
447 if !old_projects.contains(&project.id) {
448 cx.emit(Event::RemoteProjectShared {
449 owner: user.clone(),
450 project_id: project.id,
451 worktree_root_names: project.worktree_root_names.clone(),
452 });
453 }
454 }
455
456 for unshared_project_id in old_projects.difference(&new_projects) {
457 cx.emit(Event::RemoteProjectUnshared {
458 project_id: *unshared_project_id,
459 });
460 }
461
462 let location = ParticipantLocation::from_proto(participant.location)
463 .unwrap_or(ParticipantLocation::External);
464 if let Some(remote_participant) =
465 this.remote_participants.get_mut(&participant.user_id)
466 {
467 remote_participant.projects = participant.projects;
468 remote_participant.peer_id = peer_id;
469 if location != remote_participant.location {
470 remote_participant.location = location;
471 cx.emit(Event::ParticipantLocationChanged {
472 participant_id: peer_id,
473 });
474 }
475 } else {
476 this.remote_participants.insert(
477 participant.user_id,
478 RemoteParticipant {
479 user: user.clone(),
480 peer_id,
481 projects: participant.projects,
482 location,
483 tracks: Default::default(),
484 },
485 );
486
487 if let Some(live_kit) = this.live_kit.as_ref() {
488 let tracks =
489 live_kit.room.remote_video_tracks(&peer_id.to_string());
490 for track in tracks {
491 this.remote_video_track_updated(
492 RemoteVideoTrackUpdate::Subscribed(track),
493 cx,
494 )
495 .log_err();
496 }
497 }
498 }
499 }
500
501 this.remote_participants.retain(|user_id, participant| {
502 if this.participant_user_ids.contains(user_id) {
503 true
504 } else {
505 for project in &participant.projects {
506 cx.emit(Event::RemoteProjectUnshared {
507 project_id: project.id,
508 });
509 }
510 false
511 }
512 });
513 }
514
515 if let Some(pending_participants) = pending_participants.log_err() {
516 this.pending_participants = pending_participants;
517 for participant in &this.pending_participants {
518 this.participant_user_ids.insert(participant.id);
519 }
520 }
521
522 this.pending_room_update.take();
523 if this.should_leave() {
524 log::info!("room is empty, leaving");
525 let _ = this.leave(cx);
526 }
527
528 this.check_invariants();
529 cx.notify();
530 });
531 }));
532
533 cx.notify();
534 Ok(())
535 }
536
537 fn remote_video_track_updated(
538 &mut self,
539 change: RemoteVideoTrackUpdate,
540 cx: &mut ModelContext<Self>,
541 ) -> Result<()> {
542 match change {
543 RemoteVideoTrackUpdate::Subscribed(track) => {
544 let user_id = track.publisher_id().parse()?;
545 let track_id = track.sid().to_string();
546 let participant = self
547 .remote_participants
548 .get_mut(&user_id)
549 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
550 participant.tracks.insert(
551 track_id.clone(),
552 Arc::new(RemoteVideoTrack {
553 live_kit_track: track,
554 }),
555 );
556 cx.emit(Event::RemoteVideoTracksChanged {
557 participant_id: participant.peer_id,
558 });
559 }
560 RemoteVideoTrackUpdate::Unsubscribed {
561 publisher_id,
562 track_id,
563 } => {
564 let user_id = publisher_id.parse()?;
565 let participant = self
566 .remote_participants
567 .get_mut(&user_id)
568 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
569 participant.tracks.remove(&track_id);
570 cx.emit(Event::RemoteVideoTracksChanged {
571 participant_id: participant.peer_id,
572 });
573 }
574 }
575
576 cx.notify();
577 Ok(())
578 }
579
580 fn check_invariants(&self) {
581 #[cfg(any(test, feature = "test-support"))]
582 {
583 for participant in self.remote_participants.values() {
584 assert!(self.participant_user_ids.contains(&participant.user.id));
585 assert_ne!(participant.user.id, self.client.user_id().unwrap());
586 }
587
588 for participant in &self.pending_participants {
589 assert!(self.participant_user_ids.contains(&participant.id));
590 assert_ne!(participant.id, self.client.user_id().unwrap());
591 }
592
593 assert_eq!(
594 self.participant_user_ids.len(),
595 self.remote_participants.len() + self.pending_participants.len()
596 );
597 }
598 }
599
600 pub(crate) fn call(
601 &mut self,
602 called_user_id: u64,
603 initial_project_id: Option<u64>,
604 cx: &mut ModelContext<Self>,
605 ) -> Task<Result<()>> {
606 if self.status.is_offline() {
607 return Task::ready(Err(anyhow!("room is offline")));
608 }
609
610 cx.notify();
611 let client = self.client.clone();
612 let room_id = self.id;
613 self.pending_call_count += 1;
614 cx.spawn(|this, mut cx| async move {
615 let result = client
616 .request(proto::Call {
617 room_id,
618 called_user_id,
619 initial_project_id,
620 })
621 .await;
622 this.update(&mut cx, |this, cx| {
623 this.pending_call_count -= 1;
624 if this.should_leave() {
625 this.leave(cx)?;
626 }
627 result
628 })?;
629 Ok(())
630 })
631 }
632
633 pub(crate) fn share_project(
634 &mut self,
635 project: ModelHandle<Project>,
636 cx: &mut ModelContext<Self>,
637 ) -> Task<Result<u64>> {
638 if let Some(project_id) = project.read(cx).remote_id() {
639 return Task::ready(Ok(project_id));
640 }
641
642 let request = self.client.request(proto::ShareProject {
643 room_id: self.id(),
644 worktrees: project
645 .read(cx)
646 .worktrees(cx)
647 .map(|worktree| {
648 let worktree = worktree.read(cx);
649 proto::WorktreeMetadata {
650 id: worktree.id().to_proto(),
651 root_name: worktree.root_name().into(),
652 visible: worktree.is_visible(),
653 abs_path: worktree.abs_path().to_string_lossy().into(),
654 }
655 })
656 .collect(),
657 });
658 cx.spawn(|this, mut cx| async move {
659 let response = request.await?;
660
661 project.update(&mut cx, |project, cx| {
662 project
663 .shared(response.project_id, cx)
664 .detach_and_log_err(cx)
665 });
666
667 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
668 this.update(&mut cx, |this, cx| {
669 let active_project = this.local_participant.active_project.as_ref();
670 if active_project.map_or(false, |location| *location == project) {
671 this.set_location(Some(&project), cx)
672 } else {
673 Task::ready(Ok(()))
674 }
675 })
676 .await?;
677
678 Ok(response.project_id)
679 })
680 }
681
682 pub(crate) fn set_location(
683 &mut self,
684 project: Option<&ModelHandle<Project>>,
685 cx: &mut ModelContext<Self>,
686 ) -> Task<Result<()>> {
687 if self.status.is_offline() {
688 return Task::ready(Err(anyhow!("room is offline")));
689 }
690
691 let client = self.client.clone();
692 let room_id = self.id;
693 let location = if let Some(project) = project {
694 self.local_participant.active_project = Some(project.downgrade());
695 if let Some(project_id) = project.read(cx).remote_id() {
696 proto::participant_location::Variant::SharedProject(
697 proto::participant_location::SharedProject { id: project_id },
698 )
699 } else {
700 proto::participant_location::Variant::UnsharedProject(
701 proto::participant_location::UnsharedProject {},
702 )
703 }
704 } else {
705 self.local_participant.active_project = None;
706 proto::participant_location::Variant::External(proto::participant_location::External {})
707 };
708
709 cx.notify();
710 cx.foreground().spawn(async move {
711 client
712 .request(proto::UpdateParticipantLocation {
713 room_id,
714 location: Some(proto::ParticipantLocation {
715 variant: Some(location),
716 }),
717 })
718 .await?;
719 Ok(())
720 })
721 }
722
723 pub fn is_screen_sharing(&self) -> bool {
724 self.live_kit.as_ref().map_or(false, |live_kit| {
725 !matches!(live_kit.screen_track, ScreenTrack::None)
726 })
727 }
728
729 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
730 if self.status.is_offline() {
731 return Task::ready(Err(anyhow!("room is offline")));
732 } else if self.is_screen_sharing() {
733 return Task::ready(Err(anyhow!("screen was already shared")));
734 }
735
736 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
737 let publish_id = post_inc(&mut live_kit.next_publish_id);
738 live_kit.screen_track = ScreenTrack::Pending { publish_id };
739 cx.notify();
740 (live_kit.room.display_sources(), publish_id)
741 } else {
742 return Task::ready(Err(anyhow!("live-kit was not initialized")));
743 };
744
745 cx.spawn_weak(|this, mut cx| async move {
746 let publish_track = async {
747 let displays = displays.await?;
748 let display = displays
749 .first()
750 .ok_or_else(|| anyhow!("no display found"))?;
751 let track = LocalVideoTrack::screen_share_for_display(&display);
752 this.upgrade(&cx)
753 .ok_or_else(|| anyhow!("room was dropped"))?
754 .read_with(&cx, |this, _| {
755 this.live_kit
756 .as_ref()
757 .map(|live_kit| live_kit.room.publish_video_track(&track))
758 })
759 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
760 .await
761 };
762
763 let publication = publish_track.await;
764 this.upgrade(&cx)
765 .ok_or_else(|| anyhow!("room was dropped"))?
766 .update(&mut cx, |this, cx| {
767 let live_kit = this
768 .live_kit
769 .as_mut()
770 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
771
772 let canceled = if let ScreenTrack::Pending {
773 publish_id: cur_publish_id,
774 } = &live_kit.screen_track
775 {
776 *cur_publish_id != publish_id
777 } else {
778 true
779 };
780
781 match publication {
782 Ok(publication) => {
783 if canceled {
784 live_kit.room.unpublish_track(publication);
785 } else {
786 live_kit.screen_track = ScreenTrack::Published(publication);
787 cx.notify();
788 }
789 Ok(())
790 }
791 Err(error) => {
792 if canceled {
793 Ok(())
794 } else {
795 live_kit.screen_track = ScreenTrack::None;
796 cx.notify();
797 Err(error)
798 }
799 }
800 }
801 })
802 })
803 }
804
805 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
806 if self.status.is_offline() {
807 return Err(anyhow!("room is offline"));
808 }
809
810 let live_kit = self
811 .live_kit
812 .as_mut()
813 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
814 match mem::take(&mut live_kit.screen_track) {
815 ScreenTrack::None => Err(anyhow!("screen was not shared")),
816 ScreenTrack::Pending { .. } => {
817 cx.notify();
818 Ok(())
819 }
820 ScreenTrack::Published(track) => {
821 live_kit.room.unpublish_track(track);
822 cx.notify();
823 Ok(())
824 }
825 }
826 }
827
828 #[cfg(any(test, feature = "test-support"))]
829 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
830 self.live_kit
831 .as_ref()
832 .unwrap()
833 .room
834 .set_display_sources(sources);
835 }
836}
837
838struct LiveKitRoom {
839 room: Arc<live_kit_client::Room>,
840 screen_track: ScreenTrack,
841 next_publish_id: usize,
842 _maintain_room: Task<()>,
843 _maintain_tracks: Task<()>,
844}
845
846enum ScreenTrack {
847 None,
848 Pending { publish_id: usize },
849 Published(LocalTrackPublication),
850}
851
852impl Default for ScreenTrack {
853 fn default() -> Self {
854 Self::None
855 }
856}
857
858#[derive(Copy, Clone, PartialEq, Eq)]
859pub enum RoomStatus {
860 Online,
861 Rejoining,
862 Offline,
863}
864
865impl RoomStatus {
866 pub fn is_offline(&self) -> bool {
867 matches!(self, RoomStatus::Offline)
868 }
869
870 pub fn is_online(&self) -> bool {
871 matches!(self, RoomStatus::Online)
872 }
873}