room.rs

  1use crate::{
  2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{BTreeMap, HashSet};
  8use futures::{FutureExt, StreamExt};
  9use gpui::{
 10    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
 11};
 12use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
 13use postage::stream::Stream;
 14use project::Project;
 15use std::{mem, sync::Arc, time::Duration};
 16use util::{post_inc, ResultExt};
 17
 18pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
 19
 20#[derive(Clone, Debug, PartialEq, Eq)]
 21pub enum Event {
 22    ParticipantLocationChanged {
 23        participant_id: PeerId,
 24    },
 25    RemoteVideoTracksChanged {
 26        participant_id: PeerId,
 27    },
 28    RemoteProjectShared {
 29        owner: Arc<User>,
 30        project_id: u64,
 31        worktree_root_names: Vec<String>,
 32    },
 33    RemoteProjectUnshared {
 34        project_id: u64,
 35    },
 36    Left,
 37}
 38
 39pub struct Room {
 40    id: u64,
 41    live_kit: Option<LiveKitRoom>,
 42    status: RoomStatus,
 43    local_participant: LocalParticipant,
 44    remote_participants: BTreeMap<PeerId, RemoteParticipant>,
 45    pending_participants: Vec<Arc<User>>,
 46    participant_user_ids: HashSet<u64>,
 47    pending_call_count: usize,
 48    leave_when_empty: bool,
 49    client: Arc<Client>,
 50    user_store: ModelHandle<UserStore>,
 51    subscriptions: Vec<client::Subscription>,
 52    pending_room_update: Option<Task<()>>,
 53    maintain_connection: Option<Task<Result<()>>>,
 54}
 55
 56impl Entity for Room {
 57    type Event = Event;
 58
 59    fn release(&mut self, _: &mut MutableAppContext) {
 60        if self.status.is_online() {
 61            self.client.send(proto::LeaveRoom {}).log_err();
 62        }
 63    }
 64}
 65
 66impl Room {
 67    fn new(
 68        id: u64,
 69        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 70        client: Arc<Client>,
 71        user_store: ModelHandle<UserStore>,
 72        cx: &mut ModelContext<Self>,
 73    ) -> Self {
 74        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 75            let room = live_kit_client::Room::new();
 76            let mut status = room.status();
 77            // Consume the initial status of the room.
 78            let _ = status.try_recv();
 79            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 80                while let Some(status) = status.next().await {
 81                    let this = if let Some(this) = this.upgrade(&cx) {
 82                        this
 83                    } else {
 84                        break;
 85                    };
 86
 87                    if status == live_kit_client::ConnectionState::Disconnected {
 88                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 89                        break;
 90                    }
 91                }
 92            });
 93
 94            let mut track_changes = room.remote_video_track_updates();
 95            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
 96                while let Some(track_change) = track_changes.next().await {
 97                    let this = if let Some(this) = this.upgrade(&cx) {
 98                        this
 99                    } else {
100                        break;
101                    };
102
103                    this.update(&mut cx, |this, cx| {
104                        this.remote_video_track_updated(track_change, cx).log_err()
105                    });
106                }
107            });
108
109            cx.foreground()
110                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
111                .detach_and_log_err(cx);
112
113            Some(LiveKitRoom {
114                room,
115                screen_track: ScreenTrack::None,
116                next_publish_id: 0,
117                _maintain_room,
118                _maintain_tracks,
119            })
120        } else {
121            None
122        };
123
124        let maintain_connection =
125            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx));
126
127        Self {
128            id,
129            live_kit: live_kit_room,
130            status: RoomStatus::Online,
131            participant_user_ids: Default::default(),
132            local_participant: Default::default(),
133            remote_participants: Default::default(),
134            pending_participants: Default::default(),
135            pending_call_count: 0,
136            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
137            leave_when_empty: false,
138            pending_room_update: None,
139            client,
140            user_store,
141            maintain_connection: Some(maintain_connection),
142        }
143    }
144
145    pub(crate) fn create(
146        called_user_id: u64,
147        initial_project: Option<ModelHandle<Project>>,
148        client: Arc<Client>,
149        user_store: ModelHandle<UserStore>,
150        cx: &mut MutableAppContext,
151    ) -> Task<Result<ModelHandle<Self>>> {
152        cx.spawn(|mut cx| async move {
153            let response = client.request(proto::CreateRoom {}).await?;
154            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
155            let room = cx.add_model(|cx| {
156                Self::new(
157                    room_proto.id,
158                    response.live_kit_connection_info,
159                    client,
160                    user_store,
161                    cx,
162                )
163            });
164
165            let initial_project_id = if let Some(initial_project) = initial_project {
166                let initial_project_id = room
167                    .update(&mut cx, |room, cx| {
168                        room.share_project(initial_project.clone(), cx)
169                    })
170                    .await?;
171                Some(initial_project_id)
172            } else {
173                None
174            };
175
176            match room
177                .update(&mut cx, |room, cx| {
178                    room.leave_when_empty = true;
179                    room.call(called_user_id, initial_project_id, cx)
180                })
181                .await
182            {
183                Ok(()) => Ok(room),
184                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
185            }
186        })
187    }
188
189    pub(crate) fn join(
190        call: &IncomingCall,
191        client: Arc<Client>,
192        user_store: ModelHandle<UserStore>,
193        cx: &mut MutableAppContext,
194    ) -> Task<Result<ModelHandle<Self>>> {
195        let room_id = call.room_id;
196        cx.spawn(|mut cx| async move {
197            let response = client.request(proto::JoinRoom { id: room_id }).await?;
198            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
199            let room = cx.add_model(|cx| {
200                Self::new(
201                    room_id,
202                    response.live_kit_connection_info,
203                    client,
204                    user_store,
205                    cx,
206                )
207            });
208            room.update(&mut cx, |room, cx| {
209                room.leave_when_empty = true;
210                room.apply_room_update(room_proto, cx)?;
211                anyhow::Ok(())
212            })?;
213            Ok(room)
214        })
215    }
216
217    fn should_leave(&self) -> bool {
218        self.leave_when_empty
219            && self.pending_room_update.is_none()
220            && self.pending_participants.is_empty()
221            && self.remote_participants.is_empty()
222            && self.pending_call_count == 0
223    }
224
225    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
226        if self.status.is_offline() {
227            return Err(anyhow!("room is offline"));
228        }
229
230        cx.notify();
231        cx.emit(Event::Left);
232        self.status = RoomStatus::Offline;
233        self.remote_participants.clear();
234        self.pending_participants.clear();
235        self.participant_user_ids.clear();
236        self.subscriptions.clear();
237        self.live_kit.take();
238        self.pending_room_update.take();
239        self.maintain_connection.take();
240        self.client.send(proto::LeaveRoom {})?;
241        Ok(())
242    }
243
244    async fn maintain_connection(
245        this: WeakModelHandle<Self>,
246        client: Arc<Client>,
247        mut cx: AsyncAppContext,
248    ) -> Result<()> {
249        let mut client_status = client.status();
250        loop {
251            let is_connected = client_status
252                .next()
253                .await
254                .map_or(false, |s| s.is_connected());
255            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
256            if !is_connected || client_status.next().await.is_some() {
257                let room_id = this
258                    .upgrade(&cx)
259                    .ok_or_else(|| anyhow!("room was dropped"))?
260                    .update(&mut cx, |this, cx| {
261                        this.status = RoomStatus::Rejoining;
262                        cx.notify();
263                        this.id
264                    });
265
266                // Wait for client to re-establish a connection to the server.
267                {
268                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
269                    let client_reconnection = async {
270                        let mut remaining_attempts = 3;
271                        while remaining_attempts > 0 {
272                            if let Some(status) = client_status.next().await {
273                                if status.is_connected() {
274                                    let rejoin_room = async {
275                                        let response =
276                                            client.request(proto::JoinRoom { id: room_id }).await?;
277                                        let room_proto =
278                                            response.room.ok_or_else(|| anyhow!("invalid room"))?;
279                                        this.upgrade(&cx)
280                                            .ok_or_else(|| anyhow!("room was dropped"))?
281                                            .update(&mut cx, |this, cx| {
282                                                this.status = RoomStatus::Online;
283                                                this.apply_room_update(room_proto, cx)
284                                            })?;
285                                        anyhow::Ok(())
286                                    };
287
288                                    if rejoin_room.await.is_ok() {
289                                        return true;
290                                    } else {
291                                        remaining_attempts -= 1;
292                                    }
293                                }
294                            } else {
295                                return false;
296                            }
297                        }
298                        false
299                    }
300                    .fuse();
301                    futures::pin_mut!(client_reconnection);
302
303                    futures::select_biased! {
304                        reconnected = client_reconnection => {
305                            if reconnected {
306                                // If we successfully joined the room, go back around the loop
307                                // waiting for future connection status changes.
308                                continue;
309                            }
310                        }
311                        _ = reconnection_timeout => {}
312                    }
313                }
314
315                // The client failed to re-establish a connection to the server
316                // or an error occurred while trying to re-join the room. Either way
317                // we leave the room and return an error.
318                if let Some(this) = this.upgrade(&cx) {
319                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
320                }
321                return Err(anyhow!(
322                    "can't reconnect to room: client failed to re-establish connection"
323                ));
324            }
325        }
326    }
327
328    pub fn id(&self) -> u64 {
329        self.id
330    }
331
332    pub fn status(&self) -> RoomStatus {
333        self.status
334    }
335
336    pub fn local_participant(&self) -> &LocalParticipant {
337        &self.local_participant
338    }
339
340    pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
341        &self.remote_participants
342    }
343
344    pub fn pending_participants(&self) -> &[Arc<User>] {
345        &self.pending_participants
346    }
347
348    pub fn contains_participant(&self, user_id: u64) -> bool {
349        self.participant_user_ids.contains(&user_id)
350    }
351
352    async fn handle_room_updated(
353        this: ModelHandle<Self>,
354        envelope: TypedEnvelope<proto::RoomUpdated>,
355        _: Arc<Client>,
356        mut cx: AsyncAppContext,
357    ) -> Result<()> {
358        let room = envelope
359            .payload
360            .room
361            .ok_or_else(|| anyhow!("invalid room"))?;
362        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
363    }
364
365    fn apply_room_update(
366        &mut self,
367        mut room: proto::Room,
368        cx: &mut ModelContext<Self>,
369    ) -> Result<()> {
370        // Filter ourselves out from the room's participants.
371        let local_participant_ix = room
372            .participants
373            .iter()
374            .position(|participant| Some(participant.user_id) == self.client.user_id());
375        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
376
377        let pending_participant_user_ids = room
378            .pending_participants
379            .iter()
380            .map(|p| p.user_id)
381            .collect::<Vec<_>>();
382        let remote_participant_user_ids = room
383            .participants
384            .iter()
385            .map(|p| p.user_id)
386            .collect::<Vec<_>>();
387        let (remote_participants, pending_participants) =
388            self.user_store.update(cx, move |user_store, cx| {
389                (
390                    user_store.get_users(remote_participant_user_ids, cx),
391                    user_store.get_users(pending_participant_user_ids, cx),
392                )
393            });
394        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
395            let (remote_participants, pending_participants) =
396                futures::join!(remote_participants, pending_participants);
397
398            this.update(&mut cx, |this, cx| {
399                this.participant_user_ids.clear();
400
401                if let Some(participant) = local_participant {
402                    this.local_participant.projects = participant.projects;
403                } else {
404                    this.local_participant.projects.clear();
405                }
406
407                if let Some(participants) = remote_participants.log_err() {
408                    let mut participant_peer_ids = HashSet::default();
409                    for (participant, user) in room.participants.into_iter().zip(participants) {
410                        let peer_id = PeerId(participant.peer_id);
411                        this.participant_user_ids.insert(participant.user_id);
412                        participant_peer_ids.insert(peer_id);
413
414                        let old_projects = this
415                            .remote_participants
416                            .get(&peer_id)
417                            .into_iter()
418                            .flat_map(|existing| &existing.projects)
419                            .map(|project| project.id)
420                            .collect::<HashSet<_>>();
421                        let new_projects = participant
422                            .projects
423                            .iter()
424                            .map(|project| project.id)
425                            .collect::<HashSet<_>>();
426
427                        for project in &participant.projects {
428                            if !old_projects.contains(&project.id) {
429                                cx.emit(Event::RemoteProjectShared {
430                                    owner: user.clone(),
431                                    project_id: project.id,
432                                    worktree_root_names: project.worktree_root_names.clone(),
433                                });
434                            }
435                        }
436
437                        for unshared_project_id in old_projects.difference(&new_projects) {
438                            cx.emit(Event::RemoteProjectUnshared {
439                                project_id: *unshared_project_id,
440                            });
441                        }
442
443                        let location = ParticipantLocation::from_proto(participant.location)
444                            .unwrap_or(ParticipantLocation::External);
445                        if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id)
446                        {
447                            remote_participant.projects = participant.projects;
448                            if location != remote_participant.location {
449                                remote_participant.location = location;
450                                cx.emit(Event::ParticipantLocationChanged {
451                                    participant_id: peer_id,
452                                });
453                            }
454                        } else {
455                            this.remote_participants.insert(
456                                peer_id,
457                                RemoteParticipant {
458                                    user: user.clone(),
459                                    projects: participant.projects,
460                                    location,
461                                    tracks: Default::default(),
462                                },
463                            );
464
465                            if let Some(live_kit) = this.live_kit.as_ref() {
466                                let tracks =
467                                    live_kit.room.remote_video_tracks(&peer_id.0.to_string());
468                                for track in tracks {
469                                    this.remote_video_track_updated(
470                                        RemoteVideoTrackUpdate::Subscribed(track),
471                                        cx,
472                                    )
473                                    .log_err();
474                                }
475                            }
476                        }
477                    }
478
479                    this.remote_participants.retain(|peer_id, participant| {
480                        if participant_peer_ids.contains(peer_id) {
481                            true
482                        } else {
483                            for project in &participant.projects {
484                                cx.emit(Event::RemoteProjectUnshared {
485                                    project_id: project.id,
486                                });
487                            }
488                            false
489                        }
490                    });
491                }
492
493                if let Some(pending_participants) = pending_participants.log_err() {
494                    this.pending_participants = pending_participants;
495                    for participant in &this.pending_participants {
496                        this.participant_user_ids.insert(participant.id);
497                    }
498                }
499
500                this.pending_room_update.take();
501                if this.should_leave() {
502                    let _ = this.leave(cx);
503                }
504
505                this.check_invariants();
506                cx.notify();
507            });
508        }));
509
510        cx.notify();
511        Ok(())
512    }
513
514    fn remote_video_track_updated(
515        &mut self,
516        change: RemoteVideoTrackUpdate,
517        cx: &mut ModelContext<Self>,
518    ) -> Result<()> {
519        match change {
520            RemoteVideoTrackUpdate::Subscribed(track) => {
521                let peer_id = PeerId(track.publisher_id().parse()?);
522                let track_id = track.sid().to_string();
523                let participant = self
524                    .remote_participants
525                    .get_mut(&peer_id)
526                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
527                participant.tracks.insert(
528                    track_id.clone(),
529                    Arc::new(RemoteVideoTrack {
530                        live_kit_track: track,
531                    }),
532                );
533                cx.emit(Event::RemoteVideoTracksChanged {
534                    participant_id: peer_id,
535                });
536            }
537            RemoteVideoTrackUpdate::Unsubscribed {
538                publisher_id,
539                track_id,
540            } => {
541                let peer_id = PeerId(publisher_id.parse()?);
542                let participant = self
543                    .remote_participants
544                    .get_mut(&peer_id)
545                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
546                participant.tracks.remove(&track_id);
547                cx.emit(Event::RemoteVideoTracksChanged {
548                    participant_id: peer_id,
549                });
550            }
551        }
552
553        cx.notify();
554        Ok(())
555    }
556
557    fn check_invariants(&self) {
558        #[cfg(any(test, feature = "test-support"))]
559        {
560            for participant in self.remote_participants.values() {
561                assert!(self.participant_user_ids.contains(&participant.user.id));
562                assert_ne!(participant.user.id, self.client.user_id().unwrap());
563            }
564
565            for participant in &self.pending_participants {
566                assert!(self.participant_user_ids.contains(&participant.id));
567                assert_ne!(participant.id, self.client.user_id().unwrap());
568            }
569
570            assert_eq!(
571                self.participant_user_ids.len(),
572                self.remote_participants.len() + self.pending_participants.len()
573            );
574        }
575    }
576
577    pub(crate) fn call(
578        &mut self,
579        called_user_id: u64,
580        initial_project_id: Option<u64>,
581        cx: &mut ModelContext<Self>,
582    ) -> Task<Result<()>> {
583        if self.status.is_offline() {
584            return Task::ready(Err(anyhow!("room is offline")));
585        }
586
587        cx.notify();
588        let client = self.client.clone();
589        let room_id = self.id;
590        self.pending_call_count += 1;
591        cx.spawn(|this, mut cx| async move {
592            let result = client
593                .request(proto::Call {
594                    room_id,
595                    called_user_id,
596                    initial_project_id,
597                })
598                .await;
599            this.update(&mut cx, |this, cx| {
600                this.pending_call_count -= 1;
601                if this.should_leave() {
602                    this.leave(cx)?;
603                }
604                result
605            })?;
606            Ok(())
607        })
608    }
609
610    pub(crate) fn share_project(
611        &mut self,
612        project: ModelHandle<Project>,
613        cx: &mut ModelContext<Self>,
614    ) -> Task<Result<u64>> {
615        if let Some(project_id) = project.read(cx).remote_id() {
616            return Task::ready(Ok(project_id));
617        }
618
619        let request = self.client.request(proto::ShareProject {
620            room_id: self.id(),
621            worktrees: project
622                .read(cx)
623                .worktrees(cx)
624                .map(|worktree| {
625                    let worktree = worktree.read(cx);
626                    proto::WorktreeMetadata {
627                        id: worktree.id().to_proto(),
628                        root_name: worktree.root_name().into(),
629                        visible: worktree.is_visible(),
630                        abs_path: worktree.abs_path().to_string_lossy().into(),
631                    }
632                })
633                .collect(),
634        });
635        cx.spawn(|this, mut cx| async move {
636            let response = request.await?;
637
638            project.update(&mut cx, |project, cx| {
639                project
640                    .shared(response.project_id, cx)
641                    .detach_and_log_err(cx)
642            });
643
644            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
645            this.update(&mut cx, |this, cx| {
646                let active_project = this.local_participant.active_project.as_ref();
647                if active_project.map_or(false, |location| *location == project) {
648                    this.set_location(Some(&project), cx)
649                } else {
650                    Task::ready(Ok(()))
651                }
652            })
653            .await?;
654
655            Ok(response.project_id)
656        })
657    }
658
659    pub(crate) fn set_location(
660        &mut self,
661        project: Option<&ModelHandle<Project>>,
662        cx: &mut ModelContext<Self>,
663    ) -> Task<Result<()>> {
664        if self.status.is_offline() {
665            return Task::ready(Err(anyhow!("room is offline")));
666        }
667
668        let client = self.client.clone();
669        let room_id = self.id;
670        let location = if let Some(project) = project {
671            self.local_participant.active_project = Some(project.downgrade());
672            if let Some(project_id) = project.read(cx).remote_id() {
673                proto::participant_location::Variant::SharedProject(
674                    proto::participant_location::SharedProject { id: project_id },
675                )
676            } else {
677                proto::participant_location::Variant::UnsharedProject(
678                    proto::participant_location::UnsharedProject {},
679                )
680            }
681        } else {
682            self.local_participant.active_project = None;
683            proto::participant_location::Variant::External(proto::participant_location::External {})
684        };
685
686        cx.notify();
687        cx.foreground().spawn(async move {
688            client
689                .request(proto::UpdateParticipantLocation {
690                    room_id,
691                    location: Some(proto::ParticipantLocation {
692                        variant: Some(location),
693                    }),
694                })
695                .await?;
696            Ok(())
697        })
698    }
699
700    pub fn is_screen_sharing(&self) -> bool {
701        self.live_kit.as_ref().map_or(false, |live_kit| {
702            !matches!(live_kit.screen_track, ScreenTrack::None)
703        })
704    }
705
706    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
707        if self.status.is_offline() {
708            return Task::ready(Err(anyhow!("room is offline")));
709        } else if self.is_screen_sharing() {
710            return Task::ready(Err(anyhow!("screen was already shared")));
711        }
712
713        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
714            let publish_id = post_inc(&mut live_kit.next_publish_id);
715            live_kit.screen_track = ScreenTrack::Pending { publish_id };
716            cx.notify();
717            (live_kit.room.display_sources(), publish_id)
718        } else {
719            return Task::ready(Err(anyhow!("live-kit was not initialized")));
720        };
721
722        cx.spawn_weak(|this, mut cx| async move {
723            let publish_track = async {
724                let displays = displays.await?;
725                let display = displays
726                    .first()
727                    .ok_or_else(|| anyhow!("no display found"))?;
728                let track = LocalVideoTrack::screen_share_for_display(&display);
729                this.upgrade(&cx)
730                    .ok_or_else(|| anyhow!("room was dropped"))?
731                    .read_with(&cx, |this, _| {
732                        this.live_kit
733                            .as_ref()
734                            .map(|live_kit| live_kit.room.publish_video_track(&track))
735                    })
736                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
737                    .await
738            };
739
740            let publication = publish_track.await;
741            this.upgrade(&cx)
742                .ok_or_else(|| anyhow!("room was dropped"))?
743                .update(&mut cx, |this, cx| {
744                    let live_kit = this
745                        .live_kit
746                        .as_mut()
747                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
748
749                    let canceled = if let ScreenTrack::Pending {
750                        publish_id: cur_publish_id,
751                    } = &live_kit.screen_track
752                    {
753                        *cur_publish_id != publish_id
754                    } else {
755                        true
756                    };
757
758                    match publication {
759                        Ok(publication) => {
760                            if canceled {
761                                live_kit.room.unpublish_track(publication);
762                            } else {
763                                live_kit.screen_track = ScreenTrack::Published(publication);
764                                cx.notify();
765                            }
766                            Ok(())
767                        }
768                        Err(error) => {
769                            if canceled {
770                                Ok(())
771                            } else {
772                                live_kit.screen_track = ScreenTrack::None;
773                                cx.notify();
774                                Err(error)
775                            }
776                        }
777                    }
778                })
779        })
780    }
781
782    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
783        if self.status.is_offline() {
784            return Err(anyhow!("room is offline"));
785        }
786
787        let live_kit = self
788            .live_kit
789            .as_mut()
790            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
791        match mem::take(&mut live_kit.screen_track) {
792            ScreenTrack::None => Err(anyhow!("screen was not shared")),
793            ScreenTrack::Pending { .. } => {
794                cx.notify();
795                Ok(())
796            }
797            ScreenTrack::Published(track) => {
798                live_kit.room.unpublish_track(track);
799                cx.notify();
800                Ok(())
801            }
802        }
803    }
804
805    #[cfg(any(test, feature = "test-support"))]
806    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
807        self.live_kit
808            .as_ref()
809            .unwrap()
810            .room
811            .set_display_sources(sources);
812    }
813}
814
815struct LiveKitRoom {
816    room: Arc<live_kit_client::Room>,
817    screen_track: ScreenTrack,
818    next_publish_id: usize,
819    _maintain_room: Task<()>,
820    _maintain_tracks: Task<()>,
821}
822
823enum ScreenTrack {
824    None,
825    Pending { publish_id: usize },
826    Published(LocalTrackPublication),
827}
828
829impl Default for ScreenTrack {
830    fn default() -> Self {
831        Self::None
832    }
833}
834
835#[derive(Copy, Clone, PartialEq, Eq)]
836pub enum RoomStatus {
837    Online,
838    Rejoining,
839    Offline,
840}
841
842impl RoomStatus {
843    pub fn is_offline(&self) -> bool {
844        matches!(self, RoomStatus::Offline)
845    }
846
847    pub fn is_online(&self) -> bool {
848        matches!(self, RoomStatus::Online)
849    }
850}