room.rs

  1use crate::{
  2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{BTreeMap, HashSet};
  8use futures::{FutureExt, StreamExt};
  9use gpui::{
 10    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
 11};
 12use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
 13use postage::stream::Stream;
 14use project::Project;
 15use std::{mem, sync::Arc, time::Duration};
 16use util::{post_inc, ResultExt};
 17
 18pub const RECONNECTION_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
 19
 20#[derive(Clone, Debug, PartialEq, Eq)]
 21pub enum Event {
 22    ParticipantLocationChanged {
 23        participant_id: PeerId,
 24    },
 25    RemoteVideoTracksChanged {
 26        participant_id: PeerId,
 27    },
 28    RemoteProjectShared {
 29        owner: Arc<User>,
 30        project_id: u64,
 31        worktree_root_names: Vec<String>,
 32    },
 33    RemoteProjectUnshared {
 34        project_id: u64,
 35    },
 36    Left,
 37}
 38
 39pub struct Room {
 40    id: u64,
 41    live_kit: Option<LiveKitRoom>,
 42    status: RoomStatus,
 43    local_participant: LocalParticipant,
 44    remote_participants: BTreeMap<PeerId, RemoteParticipant>,
 45    pending_participants: Vec<Arc<User>>,
 46    participant_user_ids: HashSet<u64>,
 47    pending_call_count: usize,
 48    leave_when_empty: bool,
 49    client: Arc<Client>,
 50    user_store: ModelHandle<UserStore>,
 51    subscriptions: Vec<client::Subscription>,
 52    pending_room_update: Option<Task<()>>,
 53    _maintain_connection: Task<Result<()>>,
 54}
 55
 56impl Entity for Room {
 57    type Event = Event;
 58
 59    fn release(&mut self, _: &mut MutableAppContext) {
 60        if self.status.is_online() {
 61            self.client.send(proto::LeaveRoom {}).log_err();
 62        }
 63    }
 64}
 65
 66impl Room {
 67    fn new(
 68        id: u64,
 69        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 70        client: Arc<Client>,
 71        user_store: ModelHandle<UserStore>,
 72        cx: &mut ModelContext<Self>,
 73    ) -> Self {
 74        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 75            let room = live_kit_client::Room::new();
 76            let mut status = room.status();
 77            // Consume the initial status of the room.
 78            let _ = status.try_recv();
 79            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 80                while let Some(status) = status.next().await {
 81                    let this = if let Some(this) = this.upgrade(&cx) {
 82                        this
 83                    } else {
 84                        break;
 85                    };
 86
 87                    if status == live_kit_client::ConnectionState::Disconnected {
 88                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 89                        break;
 90                    }
 91                }
 92            });
 93
 94            let mut track_changes = room.remote_video_track_updates();
 95            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
 96                while let Some(track_change) = track_changes.next().await {
 97                    let this = if let Some(this) = this.upgrade(&cx) {
 98                        this
 99                    } else {
100                        break;
101                    };
102
103                    this.update(&mut cx, |this, cx| {
104                        this.remote_video_track_updated(track_change, cx).log_err()
105                    });
106                }
107            });
108
109            cx.foreground()
110                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
111                .detach_and_log_err(cx);
112
113            Some(LiveKitRoom {
114                room,
115                screen_track: ScreenTrack::None,
116                next_publish_id: 0,
117                _maintain_room,
118                _maintain_tracks,
119            })
120        } else {
121            None
122        };
123
124        let _maintain_connection =
125            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx));
126
127        Self {
128            id,
129            live_kit: live_kit_room,
130            status: RoomStatus::Online,
131            participant_user_ids: Default::default(),
132            local_participant: Default::default(),
133            remote_participants: Default::default(),
134            pending_participants: Default::default(),
135            pending_call_count: 0,
136            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
137            leave_when_empty: false,
138            pending_room_update: None,
139            client,
140            user_store,
141            _maintain_connection,
142        }
143    }
144
145    pub(crate) fn create(
146        called_user_id: u64,
147        initial_project: Option<ModelHandle<Project>>,
148        client: Arc<Client>,
149        user_store: ModelHandle<UserStore>,
150        cx: &mut MutableAppContext,
151    ) -> Task<Result<ModelHandle<Self>>> {
152        cx.spawn(|mut cx| async move {
153            let response = client.request(proto::CreateRoom {}).await?;
154            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
155            let room = cx.add_model(|cx| {
156                Self::new(
157                    room_proto.id,
158                    response.live_kit_connection_info,
159                    client,
160                    user_store,
161                    cx,
162                )
163            });
164
165            let initial_project_id = if let Some(initial_project) = initial_project {
166                let initial_project_id = room
167                    .update(&mut cx, |room, cx| {
168                        room.share_project(initial_project.clone(), cx)
169                    })
170                    .await?;
171                Some(initial_project_id)
172            } else {
173                None
174            };
175
176            match room
177                .update(&mut cx, |room, cx| {
178                    room.leave_when_empty = true;
179                    room.call(called_user_id, initial_project_id, cx)
180                })
181                .await
182            {
183                Ok(()) => Ok(room),
184                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
185            }
186        })
187    }
188
189    pub(crate) fn join(
190        call: &IncomingCall,
191        client: Arc<Client>,
192        user_store: ModelHandle<UserStore>,
193        cx: &mut MutableAppContext,
194    ) -> Task<Result<ModelHandle<Self>>> {
195        let room_id = call.room_id;
196        cx.spawn(|mut cx| async move {
197            let response = client.request(proto::JoinRoom { id: room_id }).await?;
198            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
199            let room = cx.add_model(|cx| {
200                Self::new(
201                    room_id,
202                    response.live_kit_connection_info,
203                    client,
204                    user_store,
205                    cx,
206                )
207            });
208            room.update(&mut cx, |room, cx| {
209                room.leave_when_empty = true;
210                room.apply_room_update(room_proto, cx)?;
211                anyhow::Ok(())
212            })?;
213            Ok(room)
214        })
215    }
216
217    fn should_leave(&self) -> bool {
218        self.leave_when_empty
219            && self.pending_room_update.is_none()
220            && self.pending_participants.is_empty()
221            && self.remote_participants.is_empty()
222            && self.pending_call_count == 0
223    }
224
225    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
226        if self.status.is_offline() {
227            return Err(anyhow!("room is offline"));
228        }
229
230        cx.notify();
231        cx.emit(Event::Left);
232        self.status = RoomStatus::Offline;
233        self.remote_participants.clear();
234        self.pending_participants.clear();
235        self.participant_user_ids.clear();
236        self.subscriptions.clear();
237        self.live_kit.take();
238        self.client.send(proto::LeaveRoom {})?;
239        Ok(())
240    }
241
242    async fn maintain_connection(
243        this: WeakModelHandle<Self>,
244        client: Arc<Client>,
245        mut cx: AsyncAppContext,
246    ) -> Result<()> {
247        let mut client_status = client.status();
248        loop {
249            let is_connected = client_status
250                .next()
251                .await
252                .map_or(false, |s| s.is_connected());
253            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
254            if !is_connected || client_status.next().await.is_some() {
255                let room_id = this
256                    .upgrade(&cx)
257                    .ok_or_else(|| anyhow!("room was dropped"))?
258                    .update(&mut cx, |this, cx| {
259                        this.status = RoomStatus::Rejoining;
260                        cx.notify();
261                        this.id
262                    });
263
264                // Wait for client to re-establish a connection to the server.
265                let mut reconnection_timeout = cx.background().timer(RECONNECTION_TIMEOUT).fuse();
266                let client_reconnection = async {
267                    loop {
268                        if let Some(status) = client_status.next().await {
269                            if status.is_connected() {
270                                return true;
271                            }
272                        } else {
273                            return false;
274                        }
275                    }
276                }
277                .fuse();
278                futures::pin_mut!(client_reconnection);
279
280                futures::select_biased! {
281                    reconnected = client_reconnection => {
282                        if reconnected {
283                            // Client managed to reconnect to the server. Now attempt to join the room.
284                            let rejoin_room = async {
285                                let response = client.request(proto::JoinRoom { id: room_id }).await?;
286                                let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
287                                this.upgrade(&cx)
288                                    .ok_or_else(|| anyhow!("room was dropped"))?
289                                    .update(&mut cx, |this, cx| {
290                                        this.status = RoomStatus::Online;
291                                        this.apply_room_update(room_proto, cx)
292                                    })?;
293                                anyhow::Ok(())
294                            };
295
296                            // If we successfully joined the room, go back around the loop
297                            // waiting for future connection status changes.
298                            if rejoin_room.await.log_err().is_some() {
299                                continue;
300                            }
301                        }
302                    }
303                    _ = reconnection_timeout => {}
304                }
305
306                // The client failed to re-establish a connection to the server
307                // or an error occurred while trying to re-join the room. Either way
308                // we leave the room and return an error.
309                if let Some(this) = this.upgrade(&cx) {
310                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
311                }
312                return Err(anyhow!(
313                    "can't reconnect to room: client failed to re-establish connection"
314                ));
315            }
316        }
317    }
318
319    pub fn id(&self) -> u64 {
320        self.id
321    }
322
323    pub fn status(&self) -> RoomStatus {
324        self.status
325    }
326
327    pub fn local_participant(&self) -> &LocalParticipant {
328        &self.local_participant
329    }
330
331    pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
332        &self.remote_participants
333    }
334
335    pub fn pending_participants(&self) -> &[Arc<User>] {
336        &self.pending_participants
337    }
338
339    pub fn contains_participant(&self, user_id: u64) -> bool {
340        self.participant_user_ids.contains(&user_id)
341    }
342
343    async fn handle_room_updated(
344        this: ModelHandle<Self>,
345        envelope: TypedEnvelope<proto::RoomUpdated>,
346        _: Arc<Client>,
347        mut cx: AsyncAppContext,
348    ) -> Result<()> {
349        let room = envelope
350            .payload
351            .room
352            .ok_or_else(|| anyhow!("invalid room"))?;
353        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
354    }
355
356    fn apply_room_update(
357        &mut self,
358        mut room: proto::Room,
359        cx: &mut ModelContext<Self>,
360    ) -> Result<()> {
361        // Filter ourselves out from the room's participants.
362        let local_participant_ix = room
363            .participants
364            .iter()
365            .position(|participant| Some(participant.user_id) == self.client.user_id());
366        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
367
368        let pending_participant_user_ids = room
369            .pending_participants
370            .iter()
371            .map(|p| p.user_id)
372            .collect::<Vec<_>>();
373        let remote_participant_user_ids = room
374            .participants
375            .iter()
376            .map(|p| p.user_id)
377            .collect::<Vec<_>>();
378        let (remote_participants, pending_participants) =
379            self.user_store.update(cx, move |user_store, cx| {
380                (
381                    user_store.get_users(remote_participant_user_ids, cx),
382                    user_store.get_users(pending_participant_user_ids, cx),
383                )
384            });
385        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
386            let (remote_participants, pending_participants) =
387                futures::join!(remote_participants, pending_participants);
388
389            this.update(&mut cx, |this, cx| {
390                this.participant_user_ids.clear();
391
392                if let Some(participant) = local_participant {
393                    this.local_participant.projects = participant.projects;
394                } else {
395                    this.local_participant.projects.clear();
396                }
397
398                if let Some(participants) = remote_participants.log_err() {
399                    let mut participant_peer_ids = HashSet::default();
400                    for (participant, user) in room.participants.into_iter().zip(participants) {
401                        let peer_id = PeerId(participant.peer_id);
402                        this.participant_user_ids.insert(participant.user_id);
403                        participant_peer_ids.insert(peer_id);
404
405                        let old_projects = this
406                            .remote_participants
407                            .get(&peer_id)
408                            .into_iter()
409                            .flat_map(|existing| &existing.projects)
410                            .map(|project| project.id)
411                            .collect::<HashSet<_>>();
412                        let new_projects = participant
413                            .projects
414                            .iter()
415                            .map(|project| project.id)
416                            .collect::<HashSet<_>>();
417
418                        for project in &participant.projects {
419                            if !old_projects.contains(&project.id) {
420                                cx.emit(Event::RemoteProjectShared {
421                                    owner: user.clone(),
422                                    project_id: project.id,
423                                    worktree_root_names: project.worktree_root_names.clone(),
424                                });
425                            }
426                        }
427
428                        for unshared_project_id in old_projects.difference(&new_projects) {
429                            cx.emit(Event::RemoteProjectUnshared {
430                                project_id: *unshared_project_id,
431                            });
432                        }
433
434                        let location = ParticipantLocation::from_proto(participant.location)
435                            .unwrap_or(ParticipantLocation::External);
436                        if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id)
437                        {
438                            remote_participant.projects = participant.projects;
439                            if location != remote_participant.location {
440                                remote_participant.location = location;
441                                cx.emit(Event::ParticipantLocationChanged {
442                                    participant_id: peer_id,
443                                });
444                            }
445                        } else {
446                            this.remote_participants.insert(
447                                peer_id,
448                                RemoteParticipant {
449                                    user: user.clone(),
450                                    projects: participant.projects,
451                                    location,
452                                    tracks: Default::default(),
453                                },
454                            );
455
456                            if let Some(live_kit) = this.live_kit.as_ref() {
457                                let tracks =
458                                    live_kit.room.remote_video_tracks(&peer_id.0.to_string());
459                                for track in tracks {
460                                    this.remote_video_track_updated(
461                                        RemoteVideoTrackUpdate::Subscribed(track),
462                                        cx,
463                                    )
464                                    .log_err();
465                                }
466                            }
467                        }
468                    }
469
470                    this.remote_participants.retain(|peer_id, participant| {
471                        if participant_peer_ids.contains(peer_id) {
472                            true
473                        } else {
474                            for project in &participant.projects {
475                                cx.emit(Event::RemoteProjectUnshared {
476                                    project_id: project.id,
477                                });
478                            }
479                            false
480                        }
481                    });
482                }
483
484                if let Some(pending_participants) = pending_participants.log_err() {
485                    this.pending_participants = pending_participants;
486                    for participant in &this.pending_participants {
487                        this.participant_user_ids.insert(participant.id);
488                    }
489                }
490
491                this.pending_room_update.take();
492                if this.should_leave() {
493                    let _ = this.leave(cx);
494                }
495
496                this.check_invariants();
497                cx.notify();
498            });
499        }));
500
501        cx.notify();
502        Ok(())
503    }
504
505    fn remote_video_track_updated(
506        &mut self,
507        change: RemoteVideoTrackUpdate,
508        cx: &mut ModelContext<Self>,
509    ) -> Result<()> {
510        match change {
511            RemoteVideoTrackUpdate::Subscribed(track) => {
512                let peer_id = PeerId(track.publisher_id().parse()?);
513                let track_id = track.sid().to_string();
514                let participant = self
515                    .remote_participants
516                    .get_mut(&peer_id)
517                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
518                participant.tracks.insert(
519                    track_id.clone(),
520                    Arc::new(RemoteVideoTrack {
521                        live_kit_track: track,
522                    }),
523                );
524                cx.emit(Event::RemoteVideoTracksChanged {
525                    participant_id: peer_id,
526                });
527            }
528            RemoteVideoTrackUpdate::Unsubscribed {
529                publisher_id,
530                track_id,
531            } => {
532                let peer_id = PeerId(publisher_id.parse()?);
533                let participant = self
534                    .remote_participants
535                    .get_mut(&peer_id)
536                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
537                participant.tracks.remove(&track_id);
538                cx.emit(Event::RemoteVideoTracksChanged {
539                    participant_id: peer_id,
540                });
541            }
542        }
543
544        cx.notify();
545        Ok(())
546    }
547
548    fn check_invariants(&self) {
549        #[cfg(any(test, feature = "test-support"))]
550        {
551            for participant in self.remote_participants.values() {
552                assert!(self.participant_user_ids.contains(&participant.user.id));
553                assert_ne!(participant.user.id, self.client.user_id().unwrap());
554            }
555
556            for participant in &self.pending_participants {
557                assert!(self.participant_user_ids.contains(&participant.id));
558                assert_ne!(participant.id, self.client.user_id().unwrap());
559            }
560
561            assert_eq!(
562                self.participant_user_ids.len(),
563                self.remote_participants.len() + self.pending_participants.len()
564            );
565        }
566    }
567
568    pub(crate) fn call(
569        &mut self,
570        called_user_id: u64,
571        initial_project_id: Option<u64>,
572        cx: &mut ModelContext<Self>,
573    ) -> Task<Result<()>> {
574        if self.status.is_offline() {
575            return Task::ready(Err(anyhow!("room is offline")));
576        }
577
578        cx.notify();
579        let client = self.client.clone();
580        let room_id = self.id;
581        self.pending_call_count += 1;
582        cx.spawn(|this, mut cx| async move {
583            let result = client
584                .request(proto::Call {
585                    room_id,
586                    called_user_id,
587                    initial_project_id,
588                })
589                .await;
590            this.update(&mut cx, |this, cx| {
591                this.pending_call_count -= 1;
592                if this.should_leave() {
593                    this.leave(cx)?;
594                }
595                result
596            })?;
597            Ok(())
598        })
599    }
600
601    pub(crate) fn share_project(
602        &mut self,
603        project: ModelHandle<Project>,
604        cx: &mut ModelContext<Self>,
605    ) -> Task<Result<u64>> {
606        if let Some(project_id) = project.read(cx).remote_id() {
607            return Task::ready(Ok(project_id));
608        }
609
610        let request = self.client.request(proto::ShareProject {
611            room_id: self.id(),
612            worktrees: project
613                .read(cx)
614                .worktrees(cx)
615                .map(|worktree| {
616                    let worktree = worktree.read(cx);
617                    proto::WorktreeMetadata {
618                        id: worktree.id().to_proto(),
619                        root_name: worktree.root_name().into(),
620                        visible: worktree.is_visible(),
621                        abs_path: worktree.abs_path().to_string_lossy().into(),
622                    }
623                })
624                .collect(),
625        });
626        cx.spawn(|this, mut cx| async move {
627            let response = request.await?;
628
629            project.update(&mut cx, |project, cx| {
630                project
631                    .shared(response.project_id, cx)
632                    .detach_and_log_err(cx)
633            });
634
635            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
636            this.update(&mut cx, |this, cx| {
637                let active_project = this.local_participant.active_project.as_ref();
638                if active_project.map_or(false, |location| *location == project) {
639                    this.set_location(Some(&project), cx)
640                } else {
641                    Task::ready(Ok(()))
642                }
643            })
644            .await?;
645
646            Ok(response.project_id)
647        })
648    }
649
650    pub(crate) fn set_location(
651        &mut self,
652        project: Option<&ModelHandle<Project>>,
653        cx: &mut ModelContext<Self>,
654    ) -> Task<Result<()>> {
655        if self.status.is_offline() {
656            return Task::ready(Err(anyhow!("room is offline")));
657        }
658
659        let client = self.client.clone();
660        let room_id = self.id;
661        let location = if let Some(project) = project {
662            self.local_participant.active_project = Some(project.downgrade());
663            if let Some(project_id) = project.read(cx).remote_id() {
664                proto::participant_location::Variant::SharedProject(
665                    proto::participant_location::SharedProject { id: project_id },
666                )
667            } else {
668                proto::participant_location::Variant::UnsharedProject(
669                    proto::participant_location::UnsharedProject {},
670                )
671            }
672        } else {
673            self.local_participant.active_project = None;
674            proto::participant_location::Variant::External(proto::participant_location::External {})
675        };
676
677        cx.notify();
678        cx.foreground().spawn(async move {
679            client
680                .request(proto::UpdateParticipantLocation {
681                    room_id,
682                    location: Some(proto::ParticipantLocation {
683                        variant: Some(location),
684                    }),
685                })
686                .await?;
687            Ok(())
688        })
689    }
690
691    pub fn is_screen_sharing(&self) -> bool {
692        self.live_kit.as_ref().map_or(false, |live_kit| {
693            !matches!(live_kit.screen_track, ScreenTrack::None)
694        })
695    }
696
697    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
698        if self.status.is_offline() {
699            return Task::ready(Err(anyhow!("room is offline")));
700        } else if self.is_screen_sharing() {
701            return Task::ready(Err(anyhow!("screen was already shared")));
702        }
703
704        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
705            let publish_id = post_inc(&mut live_kit.next_publish_id);
706            live_kit.screen_track = ScreenTrack::Pending { publish_id };
707            cx.notify();
708            (live_kit.room.display_sources(), publish_id)
709        } else {
710            return Task::ready(Err(anyhow!("live-kit was not initialized")));
711        };
712
713        cx.spawn_weak(|this, mut cx| async move {
714            let publish_track = async {
715                let displays = displays.await?;
716                let display = displays
717                    .first()
718                    .ok_or_else(|| anyhow!("no display found"))?;
719                let track = LocalVideoTrack::screen_share_for_display(&display);
720                this.upgrade(&cx)
721                    .ok_or_else(|| anyhow!("room was dropped"))?
722                    .read_with(&cx, |this, _| {
723                        this.live_kit
724                            .as_ref()
725                            .map(|live_kit| live_kit.room.publish_video_track(&track))
726                    })
727                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
728                    .await
729            };
730
731            let publication = publish_track.await;
732            this.upgrade(&cx)
733                .ok_or_else(|| anyhow!("room was dropped"))?
734                .update(&mut cx, |this, cx| {
735                    let live_kit = this
736                        .live_kit
737                        .as_mut()
738                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
739
740                    let canceled = if let ScreenTrack::Pending {
741                        publish_id: cur_publish_id,
742                    } = &live_kit.screen_track
743                    {
744                        *cur_publish_id != publish_id
745                    } else {
746                        true
747                    };
748
749                    match publication {
750                        Ok(publication) => {
751                            if canceled {
752                                live_kit.room.unpublish_track(publication);
753                            } else {
754                                live_kit.screen_track = ScreenTrack::Published(publication);
755                                cx.notify();
756                            }
757                            Ok(())
758                        }
759                        Err(error) => {
760                            if canceled {
761                                Ok(())
762                            } else {
763                                live_kit.screen_track = ScreenTrack::None;
764                                cx.notify();
765                                Err(error)
766                            }
767                        }
768                    }
769                })
770        })
771    }
772
773    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
774        if self.status.is_offline() {
775            return Err(anyhow!("room is offline"));
776        }
777
778        let live_kit = self
779            .live_kit
780            .as_mut()
781            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
782        match mem::take(&mut live_kit.screen_track) {
783            ScreenTrack::None => Err(anyhow!("screen was not shared")),
784            ScreenTrack::Pending { .. } => {
785                cx.notify();
786                Ok(())
787            }
788            ScreenTrack::Published(track) => {
789                live_kit.room.unpublish_track(track);
790                cx.notify();
791                Ok(())
792            }
793        }
794    }
795
796    #[cfg(any(test, feature = "test-support"))]
797    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
798        self.live_kit
799            .as_ref()
800            .unwrap()
801            .room
802            .set_display_sources(sources);
803    }
804}
805
806struct LiveKitRoom {
807    room: Arc<live_kit_client::Room>,
808    screen_track: ScreenTrack,
809    next_publish_id: usize,
810    _maintain_room: Task<()>,
811    _maintain_tracks: Task<()>,
812}
813
814enum ScreenTrack {
815    None,
816    Pending { publish_id: usize },
817    Published(LocalTrackPublication),
818}
819
820impl Default for ScreenTrack {
821    fn default() -> Self {
822        Self::None
823    }
824}
825
826#[derive(Copy, Clone, PartialEq, Eq)]
827pub enum RoomStatus {
828    Online,
829    Rejoining,
830    Offline,
831}
832
833impl RoomStatus {
834    pub fn is_offline(&self) -> bool {
835        matches!(self, RoomStatus::Offline)
836    }
837
838    pub fn is_online(&self) -> bool {
839        matches!(self, RoomStatus::Online)
840    }
841}