room.rs

  1use crate::{
  2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, TypedEnvelope, User, UserStore};
  7use collections::{BTreeMap, HashSet};
  8use futures::{FutureExt, StreamExt};
  9use gpui::{
 10    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
 11};
 12use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
 13use postage::stream::Stream;
 14use project::Project;
 15use std::{mem, sync::Arc, time::Duration};
 16use util::{post_inc, ResultExt, TryFutureExt};
 17
 18pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
 19
 20#[derive(Clone, Debug, PartialEq, Eq)]
 21pub enum Event {
 22    ParticipantLocationChanged {
 23        participant_id: proto::PeerId,
 24    },
 25    RemoteVideoTracksChanged {
 26        participant_id: proto::PeerId,
 27    },
 28    RemoteProjectShared {
 29        owner: Arc<User>,
 30        project_id: u64,
 31        worktree_root_names: Vec<String>,
 32    },
 33    RemoteProjectUnshared {
 34        project_id: u64,
 35    },
 36    Left,
 37}
 38
 39pub struct Room {
 40    id: u64,
 41    live_kit: Option<LiveKitRoom>,
 42    status: RoomStatus,
 43    local_participant: LocalParticipant,
 44    remote_participants: BTreeMap<proto::PeerId, RemoteParticipant>,
 45    pending_participants: Vec<Arc<User>>,
 46    participant_user_ids: HashSet<u64>,
 47    pending_call_count: usize,
 48    leave_when_empty: bool,
 49    client: Arc<Client>,
 50    user_store: ModelHandle<UserStore>,
 51    subscriptions: Vec<client::Subscription>,
 52    pending_room_update: Option<Task<()>>,
 53    maintain_connection: Option<Task<Option<()>>>,
 54}
 55
 56impl Entity for Room {
 57    type Event = Event;
 58
 59    fn release(&mut self, _: &mut MutableAppContext) {
 60        if self.status.is_online() {
 61            log::info!("room was released, sending leave message");
 62            self.client.send(proto::LeaveRoom {}).log_err();
 63        }
 64    }
 65}
 66
 67impl Room {
 68    fn new(
 69        id: u64,
 70        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 71        client: Arc<Client>,
 72        user_store: ModelHandle<UserStore>,
 73        cx: &mut ModelContext<Self>,
 74    ) -> Self {
 75        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 76            let room = live_kit_client::Room::new();
 77            let mut status = room.status();
 78            // Consume the initial status of the room.
 79            let _ = status.try_recv();
 80            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 81                while let Some(status) = status.next().await {
 82                    let this = if let Some(this) = this.upgrade(&cx) {
 83                        this
 84                    } else {
 85                        break;
 86                    };
 87
 88                    if status == live_kit_client::ConnectionState::Disconnected {
 89                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 90                        break;
 91                    }
 92                }
 93            });
 94
 95            let mut track_changes = room.remote_video_track_updates();
 96            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
 97                while let Some(track_change) = track_changes.next().await {
 98                    let this = if let Some(this) = this.upgrade(&cx) {
 99                        this
100                    } else {
101                        break;
102                    };
103
104                    this.update(&mut cx, |this, cx| {
105                        this.remote_video_track_updated(track_change, cx).log_err()
106                    });
107                }
108            });
109
110            cx.foreground()
111                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
112                .detach_and_log_err(cx);
113
114            Some(LiveKitRoom {
115                room,
116                screen_track: ScreenTrack::None,
117                next_publish_id: 0,
118                _maintain_room,
119                _maintain_tracks,
120            })
121        } else {
122            None
123        };
124
125        let maintain_connection =
126            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
127
128        Self {
129            id,
130            live_kit: live_kit_room,
131            status: RoomStatus::Online,
132            participant_user_ids: Default::default(),
133            local_participant: Default::default(),
134            remote_participants: Default::default(),
135            pending_participants: Default::default(),
136            pending_call_count: 0,
137            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
138            leave_when_empty: false,
139            pending_room_update: None,
140            client,
141            user_store,
142            maintain_connection: Some(maintain_connection),
143        }
144    }
145
146    pub(crate) fn create(
147        called_user_id: u64,
148        initial_project: Option<ModelHandle<Project>>,
149        client: Arc<Client>,
150        user_store: ModelHandle<UserStore>,
151        cx: &mut MutableAppContext,
152    ) -> Task<Result<ModelHandle<Self>>> {
153        cx.spawn(|mut cx| async move {
154            let response = client.request(proto::CreateRoom {}).await?;
155            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
156            let room = cx.add_model(|cx| {
157                Self::new(
158                    room_proto.id,
159                    response.live_kit_connection_info,
160                    client,
161                    user_store,
162                    cx,
163                )
164            });
165
166            let initial_project_id = if let Some(initial_project) = initial_project {
167                let initial_project_id = room
168                    .update(&mut cx, |room, cx| {
169                        room.share_project(initial_project.clone(), cx)
170                    })
171                    .await?;
172                Some(initial_project_id)
173            } else {
174                None
175            };
176
177            match room
178                .update(&mut cx, |room, cx| {
179                    room.leave_when_empty = true;
180                    room.call(called_user_id, initial_project_id, cx)
181                })
182                .await
183            {
184                Ok(()) => Ok(room),
185                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
186            }
187        })
188    }
189
190    pub(crate) fn join(
191        call: &IncomingCall,
192        client: Arc<Client>,
193        user_store: ModelHandle<UserStore>,
194        cx: &mut MutableAppContext,
195    ) -> Task<Result<ModelHandle<Self>>> {
196        let room_id = call.room_id;
197        cx.spawn(|mut cx| async move {
198            let response = client.request(proto::JoinRoom { id: room_id }).await?;
199            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
200            let room = cx.add_model(|cx| {
201                Self::new(
202                    room_id,
203                    response.live_kit_connection_info,
204                    client,
205                    user_store,
206                    cx,
207                )
208            });
209            room.update(&mut cx, |room, cx| {
210                room.leave_when_empty = true;
211                room.apply_room_update(room_proto, cx)?;
212                anyhow::Ok(())
213            })?;
214            Ok(room)
215        })
216    }
217
218    fn should_leave(&self) -> bool {
219        self.leave_when_empty
220            && self.pending_room_update.is_none()
221            && self.pending_participants.is_empty()
222            && self.remote_participants.is_empty()
223            && self.pending_call_count == 0
224    }
225
226    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
227        if self.status.is_offline() {
228            return Err(anyhow!("room is offline"));
229        }
230
231        cx.notify();
232        cx.emit(Event::Left);
233        log::info!("leaving room");
234        self.status = RoomStatus::Offline;
235        self.remote_participants.clear();
236        self.pending_participants.clear();
237        self.participant_user_ids.clear();
238        self.subscriptions.clear();
239        self.live_kit.take();
240        self.pending_room_update.take();
241        self.maintain_connection.take();
242        self.client.send(proto::LeaveRoom {})?;
243        Ok(())
244    }
245
246    async fn maintain_connection(
247        this: WeakModelHandle<Self>,
248        client: Arc<Client>,
249        mut cx: AsyncAppContext,
250    ) -> Result<()> {
251        let mut client_status = client.status();
252        loop {
253            let is_connected = client_status
254                .next()
255                .await
256                .map_or(false, |s| s.is_connected());
257            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
258            if !is_connected || client_status.next().await.is_some() {
259                log::info!("detected client disconnection");
260                let room_id = this
261                    .upgrade(&cx)
262                    .ok_or_else(|| anyhow!("room was dropped"))?
263                    .update(&mut cx, |this, cx| {
264                        this.status = RoomStatus::Rejoining;
265                        cx.notify();
266                        this.id
267                    });
268
269                // Wait for client to re-establish a connection to the server.
270                {
271                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
272                    let client_reconnection = async {
273                        let mut remaining_attempts = 3;
274                        while remaining_attempts > 0 {
275                            log::info!(
276                                "waiting for client status change, remaining attempts {}",
277                                remaining_attempts
278                            );
279                            if let Some(status) = client_status.next().await {
280                                if status.is_connected() {
281                                    log::info!("client reconnected, attempting to rejoin room");
282                                    let rejoin_room = async {
283                                        let response =
284                                            client.request(proto::JoinRoom { id: room_id }).await?;
285                                        let room_proto =
286                                            response.room.ok_or_else(|| anyhow!("invalid room"))?;
287                                        this.upgrade(&cx)
288                                            .ok_or_else(|| anyhow!("room was dropped"))?
289                                            .update(&mut cx, |this, cx| {
290                                                this.status = RoomStatus::Online;
291                                                this.apply_room_update(room_proto, cx)
292                                            })?;
293                                        anyhow::Ok(())
294                                    };
295
296                                    if rejoin_room.await.log_err().is_some() {
297                                        return true;
298                                    } else {
299                                        remaining_attempts -= 1;
300                                    }
301                                }
302                            } else {
303                                return false;
304                            }
305                        }
306                        false
307                    }
308                    .fuse();
309                    futures::pin_mut!(client_reconnection);
310
311                    futures::select_biased! {
312                        reconnected = client_reconnection => {
313                            if reconnected {
314                                log::info!("successfully reconnected to room");
315                                // If we successfully joined the room, go back around the loop
316                                // waiting for future connection status changes.
317                                continue;
318                            }
319                        }
320                        _ = reconnection_timeout => {
321                            log::info!("room reconnection timeout expired");
322                        }
323                    }
324                }
325
326                // The client failed to re-establish a connection to the server
327                // or an error occurred while trying to re-join the room. Either way
328                // we leave the room and return an error.
329                if let Some(this) = this.upgrade(&cx) {
330                    log::info!("reconnection failed, leaving room");
331                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
332                }
333                return Err(anyhow!(
334                    "can't reconnect to room: client failed to re-establish connection"
335                ));
336            }
337        }
338    }
339
340    pub fn id(&self) -> u64 {
341        self.id
342    }
343
344    pub fn status(&self) -> RoomStatus {
345        self.status
346    }
347
348    pub fn local_participant(&self) -> &LocalParticipant {
349        &self.local_participant
350    }
351
352    pub fn remote_participants(&self) -> &BTreeMap<proto::PeerId, RemoteParticipant> {
353        &self.remote_participants
354    }
355
356    pub fn pending_participants(&self) -> &[Arc<User>] {
357        &self.pending_participants
358    }
359
360    pub fn contains_participant(&self, user_id: u64) -> bool {
361        self.participant_user_ids.contains(&user_id)
362    }
363
364    async fn handle_room_updated(
365        this: ModelHandle<Self>,
366        envelope: TypedEnvelope<proto::RoomUpdated>,
367        _: Arc<Client>,
368        mut cx: AsyncAppContext,
369    ) -> Result<()> {
370        let room = envelope
371            .payload
372            .room
373            .ok_or_else(|| anyhow!("invalid room"))?;
374        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
375    }
376
377    fn apply_room_update(
378        &mut self,
379        mut room: proto::Room,
380        cx: &mut ModelContext<Self>,
381    ) -> Result<()> {
382        // Filter ourselves out from the room's participants.
383        let local_participant_ix = room
384            .participants
385            .iter()
386            .position(|participant| Some(participant.user_id) == self.client.user_id());
387        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
388
389        let pending_participant_user_ids = room
390            .pending_participants
391            .iter()
392            .map(|p| p.user_id)
393            .collect::<Vec<_>>();
394        let remote_participant_user_ids = room
395            .participants
396            .iter()
397            .map(|p| p.user_id)
398            .collect::<Vec<_>>();
399        let (remote_participants, pending_participants) =
400            self.user_store.update(cx, move |user_store, cx| {
401                (
402                    user_store.get_users(remote_participant_user_ids, cx),
403                    user_store.get_users(pending_participant_user_ids, cx),
404                )
405            });
406        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
407            let (remote_participants, pending_participants) =
408                futures::join!(remote_participants, pending_participants);
409
410            this.update(&mut cx, |this, cx| {
411                this.participant_user_ids.clear();
412
413                if let Some(participant) = local_participant {
414                    this.local_participant.projects = participant.projects;
415                } else {
416                    this.local_participant.projects.clear();
417                }
418
419                if let Some(participants) = remote_participants.log_err() {
420                    let mut participant_peer_ids = HashSet::default();
421                    for (participant, user) in room.participants.into_iter().zip(participants) {
422                        let Some(peer_id) = participant.peer_id else { continue };
423                        this.participant_user_ids.insert(participant.user_id);
424                        participant_peer_ids.insert(peer_id);
425
426                        let old_projects = this
427                            .remote_participants
428                            .get(&peer_id)
429                            .into_iter()
430                            .flat_map(|existing| &existing.projects)
431                            .map(|project| project.id)
432                            .collect::<HashSet<_>>();
433                        let new_projects = participant
434                            .projects
435                            .iter()
436                            .map(|project| project.id)
437                            .collect::<HashSet<_>>();
438
439                        for project in &participant.projects {
440                            if !old_projects.contains(&project.id) {
441                                cx.emit(Event::RemoteProjectShared {
442                                    owner: user.clone(),
443                                    project_id: project.id,
444                                    worktree_root_names: project.worktree_root_names.clone(),
445                                });
446                            }
447                        }
448
449                        for unshared_project_id in old_projects.difference(&new_projects) {
450                            cx.emit(Event::RemoteProjectUnshared {
451                                project_id: *unshared_project_id,
452                            });
453                        }
454
455                        let location = ParticipantLocation::from_proto(participant.location)
456                            .unwrap_or(ParticipantLocation::External);
457                        if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id)
458                        {
459                            remote_participant.projects = participant.projects;
460                            if location != remote_participant.location {
461                                remote_participant.location = location;
462                                cx.emit(Event::ParticipantLocationChanged {
463                                    participant_id: peer_id,
464                                });
465                            }
466                        } else {
467                            this.remote_participants.insert(
468                                peer_id,
469                                RemoteParticipant {
470                                    user: user.clone(),
471                                    projects: participant.projects,
472                                    location,
473                                    tracks: Default::default(),
474                                },
475                            );
476
477                            if let Some(live_kit) = this.live_kit.as_ref() {
478                                let tracks =
479                                    live_kit.room.remote_video_tracks(&peer_id.to_string());
480                                for track in tracks {
481                                    this.remote_video_track_updated(
482                                        RemoteVideoTrackUpdate::Subscribed(track),
483                                        cx,
484                                    )
485                                    .log_err();
486                                }
487                            }
488                        }
489                    }
490
491                    this.remote_participants.retain(|peer_id, participant| {
492                        if participant_peer_ids.contains(peer_id) {
493                            true
494                        } else {
495                            for project in &participant.projects {
496                                cx.emit(Event::RemoteProjectUnshared {
497                                    project_id: project.id,
498                                });
499                            }
500                            false
501                        }
502                    });
503                }
504
505                if let Some(pending_participants) = pending_participants.log_err() {
506                    this.pending_participants = pending_participants;
507                    for participant in &this.pending_participants {
508                        this.participant_user_ids.insert(participant.id);
509                    }
510                }
511
512                this.pending_room_update.take();
513                if this.should_leave() {
514                    log::info!("room is empty, leaving");
515                    let _ = this.leave(cx);
516                }
517
518                this.check_invariants();
519                cx.notify();
520            });
521        }));
522
523        cx.notify();
524        Ok(())
525    }
526
527    fn remote_video_track_updated(
528        &mut self,
529        change: RemoteVideoTrackUpdate,
530        cx: &mut ModelContext<Self>,
531    ) -> Result<()> {
532        match change {
533            RemoteVideoTrackUpdate::Subscribed(track) => {
534                let peer_id = track.publisher_id().parse()?;
535                let track_id = track.sid().to_string();
536                let participant = self
537                    .remote_participants
538                    .get_mut(&peer_id)
539                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
540                participant.tracks.insert(
541                    track_id.clone(),
542                    Arc::new(RemoteVideoTrack {
543                        live_kit_track: track,
544                    }),
545                );
546                cx.emit(Event::RemoteVideoTracksChanged {
547                    participant_id: peer_id,
548                });
549            }
550            RemoteVideoTrackUpdate::Unsubscribed {
551                publisher_id,
552                track_id,
553            } => {
554                let peer_id = publisher_id.parse()?;
555                let participant = self
556                    .remote_participants
557                    .get_mut(&peer_id)
558                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
559                participant.tracks.remove(&track_id);
560                cx.emit(Event::RemoteVideoTracksChanged {
561                    participant_id: peer_id,
562                });
563            }
564        }
565
566        cx.notify();
567        Ok(())
568    }
569
570    fn check_invariants(&self) {
571        #[cfg(any(test, feature = "test-support"))]
572        {
573            for participant in self.remote_participants.values() {
574                assert!(self.participant_user_ids.contains(&participant.user.id));
575                assert_ne!(participant.user.id, self.client.user_id().unwrap());
576            }
577
578            for participant in &self.pending_participants {
579                assert!(self.participant_user_ids.contains(&participant.id));
580                assert_ne!(participant.id, self.client.user_id().unwrap());
581            }
582
583            assert_eq!(
584                self.participant_user_ids.len(),
585                self.remote_participants.len() + self.pending_participants.len()
586            );
587        }
588    }
589
590    pub(crate) fn call(
591        &mut self,
592        called_user_id: u64,
593        initial_project_id: Option<u64>,
594        cx: &mut ModelContext<Self>,
595    ) -> Task<Result<()>> {
596        if self.status.is_offline() {
597            return Task::ready(Err(anyhow!("room is offline")));
598        }
599
600        cx.notify();
601        let client = self.client.clone();
602        let room_id = self.id;
603        self.pending_call_count += 1;
604        cx.spawn(|this, mut cx| async move {
605            let result = client
606                .request(proto::Call {
607                    room_id,
608                    called_user_id,
609                    initial_project_id,
610                })
611                .await;
612            this.update(&mut cx, |this, cx| {
613                this.pending_call_count -= 1;
614                if this.should_leave() {
615                    this.leave(cx)?;
616                }
617                result
618            })?;
619            Ok(())
620        })
621    }
622
623    pub(crate) fn share_project(
624        &mut self,
625        project: ModelHandle<Project>,
626        cx: &mut ModelContext<Self>,
627    ) -> Task<Result<u64>> {
628        if let Some(project_id) = project.read(cx).remote_id() {
629            return Task::ready(Ok(project_id));
630        }
631
632        let request = self.client.request(proto::ShareProject {
633            room_id: self.id(),
634            worktrees: project
635                .read(cx)
636                .worktrees(cx)
637                .map(|worktree| {
638                    let worktree = worktree.read(cx);
639                    proto::WorktreeMetadata {
640                        id: worktree.id().to_proto(),
641                        root_name: worktree.root_name().into(),
642                        visible: worktree.is_visible(),
643                        abs_path: worktree.abs_path().to_string_lossy().into(),
644                    }
645                })
646                .collect(),
647        });
648        cx.spawn(|this, mut cx| async move {
649            let response = request.await?;
650
651            project.update(&mut cx, |project, cx| {
652                project
653                    .shared(response.project_id, cx)
654                    .detach_and_log_err(cx)
655            });
656
657            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
658            this.update(&mut cx, |this, cx| {
659                let active_project = this.local_participant.active_project.as_ref();
660                if active_project.map_or(false, |location| *location == project) {
661                    this.set_location(Some(&project), cx)
662                } else {
663                    Task::ready(Ok(()))
664                }
665            })
666            .await?;
667
668            Ok(response.project_id)
669        })
670    }
671
672    pub(crate) fn set_location(
673        &mut self,
674        project: Option<&ModelHandle<Project>>,
675        cx: &mut ModelContext<Self>,
676    ) -> Task<Result<()>> {
677        if self.status.is_offline() {
678            return Task::ready(Err(anyhow!("room is offline")));
679        }
680
681        let client = self.client.clone();
682        let room_id = self.id;
683        let location = if let Some(project) = project {
684            self.local_participant.active_project = Some(project.downgrade());
685            if let Some(project_id) = project.read(cx).remote_id() {
686                proto::participant_location::Variant::SharedProject(
687                    proto::participant_location::SharedProject { id: project_id },
688                )
689            } else {
690                proto::participant_location::Variant::UnsharedProject(
691                    proto::participant_location::UnsharedProject {},
692                )
693            }
694        } else {
695            self.local_participant.active_project = None;
696            proto::participant_location::Variant::External(proto::participant_location::External {})
697        };
698
699        cx.notify();
700        cx.foreground().spawn(async move {
701            client
702                .request(proto::UpdateParticipantLocation {
703                    room_id,
704                    location: Some(proto::ParticipantLocation {
705                        variant: Some(location),
706                    }),
707                })
708                .await?;
709            Ok(())
710        })
711    }
712
713    pub fn is_screen_sharing(&self) -> bool {
714        self.live_kit.as_ref().map_or(false, |live_kit| {
715            !matches!(live_kit.screen_track, ScreenTrack::None)
716        })
717    }
718
719    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
720        if self.status.is_offline() {
721            return Task::ready(Err(anyhow!("room is offline")));
722        } else if self.is_screen_sharing() {
723            return Task::ready(Err(anyhow!("screen was already shared")));
724        }
725
726        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
727            let publish_id = post_inc(&mut live_kit.next_publish_id);
728            live_kit.screen_track = ScreenTrack::Pending { publish_id };
729            cx.notify();
730            (live_kit.room.display_sources(), publish_id)
731        } else {
732            return Task::ready(Err(anyhow!("live-kit was not initialized")));
733        };
734
735        cx.spawn_weak(|this, mut cx| async move {
736            let publish_track = async {
737                let displays = displays.await?;
738                let display = displays
739                    .first()
740                    .ok_or_else(|| anyhow!("no display found"))?;
741                let track = LocalVideoTrack::screen_share_for_display(&display);
742                this.upgrade(&cx)
743                    .ok_or_else(|| anyhow!("room was dropped"))?
744                    .read_with(&cx, |this, _| {
745                        this.live_kit
746                            .as_ref()
747                            .map(|live_kit| live_kit.room.publish_video_track(&track))
748                    })
749                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
750                    .await
751            };
752
753            let publication = publish_track.await;
754            this.upgrade(&cx)
755                .ok_or_else(|| anyhow!("room was dropped"))?
756                .update(&mut cx, |this, cx| {
757                    let live_kit = this
758                        .live_kit
759                        .as_mut()
760                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
761
762                    let canceled = if let ScreenTrack::Pending {
763                        publish_id: cur_publish_id,
764                    } = &live_kit.screen_track
765                    {
766                        *cur_publish_id != publish_id
767                    } else {
768                        true
769                    };
770
771                    match publication {
772                        Ok(publication) => {
773                            if canceled {
774                                live_kit.room.unpublish_track(publication);
775                            } else {
776                                live_kit.screen_track = ScreenTrack::Published(publication);
777                                cx.notify();
778                            }
779                            Ok(())
780                        }
781                        Err(error) => {
782                            if canceled {
783                                Ok(())
784                            } else {
785                                live_kit.screen_track = ScreenTrack::None;
786                                cx.notify();
787                                Err(error)
788                            }
789                        }
790                    }
791                })
792        })
793    }
794
795    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
796        if self.status.is_offline() {
797            return Err(anyhow!("room is offline"));
798        }
799
800        let live_kit = self
801            .live_kit
802            .as_mut()
803            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
804        match mem::take(&mut live_kit.screen_track) {
805            ScreenTrack::None => Err(anyhow!("screen was not shared")),
806            ScreenTrack::Pending { .. } => {
807                cx.notify();
808                Ok(())
809            }
810            ScreenTrack::Published(track) => {
811                live_kit.room.unpublish_track(track);
812                cx.notify();
813                Ok(())
814            }
815        }
816    }
817
818    #[cfg(any(test, feature = "test-support"))]
819    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
820        self.live_kit
821            .as_ref()
822            .unwrap()
823            .room
824            .set_display_sources(sources);
825    }
826}
827
828struct LiveKitRoom {
829    room: Arc<live_kit_client::Room>,
830    screen_track: ScreenTrack,
831    next_publish_id: usize,
832    _maintain_room: Task<()>,
833    _maintain_tracks: Task<()>,
834}
835
836enum ScreenTrack {
837    None,
838    Pending { publish_id: usize },
839    Published(LocalTrackPublication),
840}
841
842impl Default for ScreenTrack {
843    fn default() -> Self {
844        Self::None
845    }
846}
847
848#[derive(Copy, Clone, PartialEq, Eq)]
849pub enum RoomStatus {
850    Online,
851    Rejoining,
852    Offline,
853}
854
855impl RoomStatus {
856    pub fn is_offline(&self) -> bool {
857        matches!(self, RoomStatus::Offline)
858    }
859
860    pub fn is_online(&self) -> bool {
861        matches!(self, RoomStatus::Online)
862    }
863}