room.rs

  1use crate::{
  2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{
  7    proto::{self, PeerId},
  8    Client, TypedEnvelope, User, UserStore,
  9};
 10use collections::{BTreeMap, HashSet};
 11use futures::{FutureExt, StreamExt};
 12use gpui::{
 13    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
 14};
 15use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
 16use postage::stream::Stream;
 17use project::Project;
 18use std::{mem, sync::Arc, time::Duration};
 19use util::{post_inc, ResultExt, TryFutureExt};
 20
 21pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
 22
 23#[derive(Clone, Debug, PartialEq, Eq)]
 24pub enum Event {
 25    ParticipantLocationChanged {
 26        participant_id: proto::PeerId,
 27    },
 28    RemoteVideoTracksChanged {
 29        participant_id: proto::PeerId,
 30    },
 31    RemoteProjectShared {
 32        owner: Arc<User>,
 33        project_id: u64,
 34        worktree_root_names: Vec<String>,
 35    },
 36    RemoteProjectUnshared {
 37        project_id: u64,
 38    },
 39    Left,
 40}
 41
 42pub struct Room {
 43    id: u64,
 44    live_kit: Option<LiveKitRoom>,
 45    status: RoomStatus,
 46    local_participant: LocalParticipant,
 47    remote_participants: BTreeMap<u64, RemoteParticipant>,
 48    pending_participants: Vec<Arc<User>>,
 49    participant_user_ids: HashSet<u64>,
 50    pending_call_count: usize,
 51    leave_when_empty: bool,
 52    client: Arc<Client>,
 53    user_store: ModelHandle<UserStore>,
 54    subscriptions: Vec<client::Subscription>,
 55    pending_room_update: Option<Task<()>>,
 56    maintain_connection: Option<Task<Option<()>>>,
 57}
 58
 59impl Entity for Room {
 60    type Event = Event;
 61
 62    fn release(&mut self, _: &mut MutableAppContext) {
 63        if self.status.is_online() {
 64            log::info!("room was released, sending leave message");
 65            self.client.send(proto::LeaveRoom {}).log_err();
 66        }
 67    }
 68}
 69
 70impl Room {
 71    fn new(
 72        id: u64,
 73        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 74        client: Arc<Client>,
 75        user_store: ModelHandle<UserStore>,
 76        cx: &mut ModelContext<Self>,
 77    ) -> Self {
 78        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 79            let room = live_kit_client::Room::new();
 80            let mut status = room.status();
 81            // Consume the initial status of the room.
 82            let _ = status.try_recv();
 83            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 84                while let Some(status) = status.next().await {
 85                    let this = if let Some(this) = this.upgrade(&cx) {
 86                        this
 87                    } else {
 88                        break;
 89                    };
 90
 91                    if status == live_kit_client::ConnectionState::Disconnected {
 92                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 93                        break;
 94                    }
 95                }
 96            });
 97
 98            let mut track_changes = room.remote_video_track_updates();
 99            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
100                while let Some(track_change) = track_changes.next().await {
101                    let this = if let Some(this) = this.upgrade(&cx) {
102                        this
103                    } else {
104                        break;
105                    };
106
107                    this.update(&mut cx, |this, cx| {
108                        this.remote_video_track_updated(track_change, cx).log_err()
109                    });
110                }
111            });
112
113            cx.foreground()
114                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
115                .detach_and_log_err(cx);
116
117            Some(LiveKitRoom {
118                room,
119                screen_track: ScreenTrack::None,
120                next_publish_id: 0,
121                _maintain_room,
122                _maintain_tracks,
123            })
124        } else {
125            None
126        };
127
128        let maintain_connection =
129            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
130
131        Self {
132            id,
133            live_kit: live_kit_room,
134            status: RoomStatus::Online,
135            participant_user_ids: Default::default(),
136            local_participant: Default::default(),
137            remote_participants: Default::default(),
138            pending_participants: Default::default(),
139            pending_call_count: 0,
140            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
141            leave_when_empty: false,
142            pending_room_update: None,
143            client,
144            user_store,
145            maintain_connection: Some(maintain_connection),
146        }
147    }
148
149    pub(crate) fn create(
150        called_user_id: u64,
151        initial_project: Option<ModelHandle<Project>>,
152        client: Arc<Client>,
153        user_store: ModelHandle<UserStore>,
154        cx: &mut MutableAppContext,
155    ) -> Task<Result<ModelHandle<Self>>> {
156        cx.spawn(|mut cx| async move {
157            let response = client.request(proto::CreateRoom {}).await?;
158            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
159            let room = cx.add_model(|cx| {
160                Self::new(
161                    room_proto.id,
162                    response.live_kit_connection_info,
163                    client,
164                    user_store,
165                    cx,
166                )
167            });
168
169            let initial_project_id = if let Some(initial_project) = initial_project {
170                let initial_project_id = room
171                    .update(&mut cx, |room, cx| {
172                        room.share_project(initial_project.clone(), cx)
173                    })
174                    .await?;
175                Some(initial_project_id)
176            } else {
177                None
178            };
179
180            match room
181                .update(&mut cx, |room, cx| {
182                    room.leave_when_empty = true;
183                    room.call(called_user_id, initial_project_id, cx)
184                })
185                .await
186            {
187                Ok(()) => Ok(room),
188                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
189            }
190        })
191    }
192
193    pub(crate) fn join(
194        call: &IncomingCall,
195        client: Arc<Client>,
196        user_store: ModelHandle<UserStore>,
197        cx: &mut MutableAppContext,
198    ) -> Task<Result<ModelHandle<Self>>> {
199        let room_id = call.room_id;
200        cx.spawn(|mut cx| async move {
201            let response = client.request(proto::JoinRoom { id: room_id }).await?;
202            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
203            let room = cx.add_model(|cx| {
204                Self::new(
205                    room_id,
206                    response.live_kit_connection_info,
207                    client,
208                    user_store,
209                    cx,
210                )
211            });
212            room.update(&mut cx, |room, cx| {
213                room.leave_when_empty = true;
214                room.apply_room_update(room_proto, cx)?;
215                anyhow::Ok(())
216            })?;
217            Ok(room)
218        })
219    }
220
221    fn should_leave(&self) -> bool {
222        self.leave_when_empty
223            && self.pending_room_update.is_none()
224            && self.pending_participants.is_empty()
225            && self.remote_participants.is_empty()
226            && self.pending_call_count == 0
227    }
228
229    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
230        if self.status.is_offline() {
231            return Err(anyhow!("room is offline"));
232        }
233
234        cx.notify();
235        cx.emit(Event::Left);
236        log::info!("leaving room");
237        self.status = RoomStatus::Offline;
238        self.remote_participants.clear();
239        self.pending_participants.clear();
240        self.participant_user_ids.clear();
241        self.subscriptions.clear();
242        self.live_kit.take();
243        self.pending_room_update.take();
244        self.maintain_connection.take();
245        self.client.send(proto::LeaveRoom {})?;
246        Ok(())
247    }
248
249    async fn maintain_connection(
250        this: WeakModelHandle<Self>,
251        client: Arc<Client>,
252        mut cx: AsyncAppContext,
253    ) -> Result<()> {
254        let mut client_status = client.status();
255        loop {
256            let is_connected = client_status
257                .next()
258                .await
259                .map_or(false, |s| s.is_connected());
260            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
261            if !is_connected || client_status.next().await.is_some() {
262                log::info!("detected client disconnection");
263                let room_id = this
264                    .upgrade(&cx)
265                    .ok_or_else(|| anyhow!("room was dropped"))?
266                    .update(&mut cx, |this, cx| {
267                        this.status = RoomStatus::Rejoining;
268                        cx.notify();
269                        this.id
270                    });
271
272                // Wait for client to re-establish a connection to the server.
273                {
274                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
275                    let client_reconnection = async {
276                        let mut remaining_attempts = 3;
277                        while remaining_attempts > 0 {
278                            log::info!(
279                                "waiting for client status change, remaining attempts {}",
280                                remaining_attempts
281                            );
282                            if let Some(status) = client_status.next().await {
283                                if status.is_connected() {
284                                    log::info!("client reconnected, attempting to rejoin room");
285                                    let rejoin_room = async {
286                                        let response =
287                                            client.request(proto::JoinRoom { id: room_id }).await?;
288                                        let room_proto =
289                                            response.room.ok_or_else(|| anyhow!("invalid room"))?;
290                                        this.upgrade(&cx)
291                                            .ok_or_else(|| anyhow!("room was dropped"))?
292                                            .update(&mut cx, |this, cx| {
293                                                this.status = RoomStatus::Online;
294                                                this.apply_room_update(room_proto, cx)
295                                            })?;
296                                        anyhow::Ok(())
297                                    };
298
299                                    if rejoin_room.await.log_err().is_some() {
300                                        return true;
301                                    } else {
302                                        remaining_attempts -= 1;
303                                    }
304                                }
305                            } else {
306                                return false;
307                            }
308                        }
309                        false
310                    }
311                    .fuse();
312                    futures::pin_mut!(client_reconnection);
313
314                    futures::select_biased! {
315                        reconnected = client_reconnection => {
316                            if reconnected {
317                                log::info!("successfully reconnected to room");
318                                // If we successfully joined the room, go back around the loop
319                                // waiting for future connection status changes.
320                                continue;
321                            }
322                        }
323                        _ = reconnection_timeout => {
324                            log::info!("room reconnection timeout expired");
325                        }
326                    }
327                }
328
329                // The client failed to re-establish a connection to the server
330                // or an error occurred while trying to re-join the room. Either way
331                // we leave the room and return an error.
332                if let Some(this) = this.upgrade(&cx) {
333                    log::info!("reconnection failed, leaving room");
334                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
335                }
336                return Err(anyhow!(
337                    "can't reconnect to room: client failed to re-establish connection"
338                ));
339            }
340        }
341    }
342
343    pub fn id(&self) -> u64 {
344        self.id
345    }
346
347    pub fn status(&self) -> RoomStatus {
348        self.status
349    }
350
351    pub fn local_participant(&self) -> &LocalParticipant {
352        &self.local_participant
353    }
354
355    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
356        &self.remote_participants
357    }
358
359    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
360        self.remote_participants
361            .values()
362            .find(|p| p.peer_id == peer_id)
363    }
364
365    pub fn pending_participants(&self) -> &[Arc<User>] {
366        &self.pending_participants
367    }
368
369    pub fn contains_participant(&self, user_id: u64) -> bool {
370        self.participant_user_ids.contains(&user_id)
371    }
372
373    async fn handle_room_updated(
374        this: ModelHandle<Self>,
375        envelope: TypedEnvelope<proto::RoomUpdated>,
376        _: Arc<Client>,
377        mut cx: AsyncAppContext,
378    ) -> Result<()> {
379        let room = envelope
380            .payload
381            .room
382            .ok_or_else(|| anyhow!("invalid room"))?;
383        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
384    }
385
386    fn apply_room_update(
387        &mut self,
388        mut room: proto::Room,
389        cx: &mut ModelContext<Self>,
390    ) -> Result<()> {
391        // Filter ourselves out from the room's participants.
392        let local_participant_ix = room
393            .participants
394            .iter()
395            .position(|participant| Some(participant.user_id) == self.client.user_id());
396        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
397
398        let pending_participant_user_ids = room
399            .pending_participants
400            .iter()
401            .map(|p| p.user_id)
402            .collect::<Vec<_>>();
403        let remote_participant_user_ids = room
404            .participants
405            .iter()
406            .map(|p| p.user_id)
407            .collect::<Vec<_>>();
408        let (remote_participants, pending_participants) =
409            self.user_store.update(cx, move |user_store, cx| {
410                (
411                    user_store.get_users(remote_participant_user_ids, cx),
412                    user_store.get_users(pending_participant_user_ids, cx),
413                )
414            });
415        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
416            let (remote_participants, pending_participants) =
417                futures::join!(remote_participants, pending_participants);
418
419            this.update(&mut cx, |this, cx| {
420                this.participant_user_ids.clear();
421
422                if let Some(participant) = local_participant {
423                    this.local_participant.projects = participant.projects;
424                } else {
425                    this.local_participant.projects.clear();
426                }
427
428                if let Some(participants) = remote_participants.log_err() {
429                    for (participant, user) in room.participants.into_iter().zip(participants) {
430                        let Some(peer_id) = participant.peer_id else { continue };
431                        this.participant_user_ids.insert(participant.user_id);
432
433                        let old_projects = this
434                            .remote_participants
435                            .get(&participant.user_id)
436                            .into_iter()
437                            .flat_map(|existing| &existing.projects)
438                            .map(|project| project.id)
439                            .collect::<HashSet<_>>();
440                        let new_projects = participant
441                            .projects
442                            .iter()
443                            .map(|project| project.id)
444                            .collect::<HashSet<_>>();
445
446                        for project in &participant.projects {
447                            if !old_projects.contains(&project.id) {
448                                cx.emit(Event::RemoteProjectShared {
449                                    owner: user.clone(),
450                                    project_id: project.id,
451                                    worktree_root_names: project.worktree_root_names.clone(),
452                                });
453                            }
454                        }
455
456                        for unshared_project_id in old_projects.difference(&new_projects) {
457                            cx.emit(Event::RemoteProjectUnshared {
458                                project_id: *unshared_project_id,
459                            });
460                        }
461
462                        let location = ParticipantLocation::from_proto(participant.location)
463                            .unwrap_or(ParticipantLocation::External);
464                        if let Some(remote_participant) =
465                            this.remote_participants.get_mut(&participant.user_id)
466                        {
467                            remote_participant.projects = participant.projects;
468                            remote_participant.peer_id = peer_id;
469                            if location != remote_participant.location {
470                                remote_participant.location = location;
471                                cx.emit(Event::ParticipantLocationChanged {
472                                    participant_id: peer_id,
473                                });
474                            }
475                        } else {
476                            this.remote_participants.insert(
477                                participant.user_id,
478                                RemoteParticipant {
479                                    user: user.clone(),
480                                    peer_id,
481                                    projects: participant.projects,
482                                    location,
483                                    tracks: Default::default(),
484                                },
485                            );
486
487                            if let Some(live_kit) = this.live_kit.as_ref() {
488                                let tracks =
489                                    live_kit.room.remote_video_tracks(&peer_id.to_string());
490                                for track in tracks {
491                                    this.remote_video_track_updated(
492                                        RemoteVideoTrackUpdate::Subscribed(track),
493                                        cx,
494                                    )
495                                    .log_err();
496                                }
497                            }
498                        }
499                    }
500
501                    this.remote_participants.retain(|user_id, participant| {
502                        if this.participant_user_ids.contains(user_id) {
503                            true
504                        } else {
505                            for project in &participant.projects {
506                                cx.emit(Event::RemoteProjectUnshared {
507                                    project_id: project.id,
508                                });
509                            }
510                            false
511                        }
512                    });
513                }
514
515                if let Some(pending_participants) = pending_participants.log_err() {
516                    this.pending_participants = pending_participants;
517                    for participant in &this.pending_participants {
518                        this.participant_user_ids.insert(participant.id);
519                    }
520                }
521
522                this.pending_room_update.take();
523                if this.should_leave() {
524                    log::info!("room is empty, leaving");
525                    let _ = this.leave(cx);
526                }
527
528                this.check_invariants();
529                cx.notify();
530            });
531        }));
532
533        cx.notify();
534        Ok(())
535    }
536
537    fn remote_video_track_updated(
538        &mut self,
539        change: RemoteVideoTrackUpdate,
540        cx: &mut ModelContext<Self>,
541    ) -> Result<()> {
542        match change {
543            RemoteVideoTrackUpdate::Subscribed(track) => {
544                let user_id = track.publisher_id().parse()?;
545                let track_id = track.sid().to_string();
546                let participant = self
547                    .remote_participants
548                    .get_mut(&user_id)
549                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
550                participant.tracks.insert(
551                    track_id.clone(),
552                    Arc::new(RemoteVideoTrack {
553                        live_kit_track: track,
554                    }),
555                );
556                cx.emit(Event::RemoteVideoTracksChanged {
557                    participant_id: participant.peer_id,
558                });
559            }
560            RemoteVideoTrackUpdate::Unsubscribed {
561                publisher_id,
562                track_id,
563            } => {
564                let user_id = publisher_id.parse()?;
565                let participant = self
566                    .remote_participants
567                    .get_mut(&user_id)
568                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
569                participant.tracks.remove(&track_id);
570                cx.emit(Event::RemoteVideoTracksChanged {
571                    participant_id: participant.peer_id,
572                });
573            }
574        }
575
576        cx.notify();
577        Ok(())
578    }
579
580    fn check_invariants(&self) {
581        #[cfg(any(test, feature = "test-support"))]
582        {
583            for participant in self.remote_participants.values() {
584                assert!(self.participant_user_ids.contains(&participant.user.id));
585                assert_ne!(participant.user.id, self.client.user_id().unwrap());
586            }
587
588            for participant in &self.pending_participants {
589                assert!(self.participant_user_ids.contains(&participant.id));
590                assert_ne!(participant.id, self.client.user_id().unwrap());
591            }
592
593            assert_eq!(
594                self.participant_user_ids.len(),
595                self.remote_participants.len() + self.pending_participants.len()
596            );
597        }
598    }
599
600    pub(crate) fn call(
601        &mut self,
602        called_user_id: u64,
603        initial_project_id: Option<u64>,
604        cx: &mut ModelContext<Self>,
605    ) -> Task<Result<()>> {
606        if self.status.is_offline() {
607            return Task::ready(Err(anyhow!("room is offline")));
608        }
609
610        cx.notify();
611        let client = self.client.clone();
612        let room_id = self.id;
613        self.pending_call_count += 1;
614        cx.spawn(|this, mut cx| async move {
615            let result = client
616                .request(proto::Call {
617                    room_id,
618                    called_user_id,
619                    initial_project_id,
620                })
621                .await;
622            this.update(&mut cx, |this, cx| {
623                this.pending_call_count -= 1;
624                if this.should_leave() {
625                    this.leave(cx)?;
626                }
627                result
628            })?;
629            Ok(())
630        })
631    }
632
633    pub(crate) fn share_project(
634        &mut self,
635        project: ModelHandle<Project>,
636        cx: &mut ModelContext<Self>,
637    ) -> Task<Result<u64>> {
638        if let Some(project_id) = project.read(cx).remote_id() {
639            return Task::ready(Ok(project_id));
640        }
641
642        let request = self.client.request(proto::ShareProject {
643            room_id: self.id(),
644            worktrees: project
645                .read(cx)
646                .worktrees(cx)
647                .map(|worktree| {
648                    let worktree = worktree.read(cx);
649                    proto::WorktreeMetadata {
650                        id: worktree.id().to_proto(),
651                        root_name: worktree.root_name().into(),
652                        visible: worktree.is_visible(),
653                        abs_path: worktree.abs_path().to_string_lossy().into(),
654                    }
655                })
656                .collect(),
657        });
658        cx.spawn(|this, mut cx| async move {
659            let response = request.await?;
660
661            project.update(&mut cx, |project, cx| {
662                project
663                    .shared(response.project_id, cx)
664                    .detach_and_log_err(cx)
665            });
666
667            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
668            this.update(&mut cx, |this, cx| {
669                let active_project = this.local_participant.active_project.as_ref();
670                if active_project.map_or(false, |location| *location == project) {
671                    this.set_location(Some(&project), cx)
672                } else {
673                    Task::ready(Ok(()))
674                }
675            })
676            .await?;
677
678            Ok(response.project_id)
679        })
680    }
681
682    pub(crate) fn set_location(
683        &mut self,
684        project: Option<&ModelHandle<Project>>,
685        cx: &mut ModelContext<Self>,
686    ) -> Task<Result<()>> {
687        if self.status.is_offline() {
688            return Task::ready(Err(anyhow!("room is offline")));
689        }
690
691        let client = self.client.clone();
692        let room_id = self.id;
693        let location = if let Some(project) = project {
694            self.local_participant.active_project = Some(project.downgrade());
695            if let Some(project_id) = project.read(cx).remote_id() {
696                proto::participant_location::Variant::SharedProject(
697                    proto::participant_location::SharedProject { id: project_id },
698                )
699            } else {
700                proto::participant_location::Variant::UnsharedProject(
701                    proto::participant_location::UnsharedProject {},
702                )
703            }
704        } else {
705            self.local_participant.active_project = None;
706            proto::participant_location::Variant::External(proto::participant_location::External {})
707        };
708
709        cx.notify();
710        cx.foreground().spawn(async move {
711            client
712                .request(proto::UpdateParticipantLocation {
713                    room_id,
714                    location: Some(proto::ParticipantLocation {
715                        variant: Some(location),
716                    }),
717                })
718                .await?;
719            Ok(())
720        })
721    }
722
723    pub fn is_screen_sharing(&self) -> bool {
724        self.live_kit.as_ref().map_or(false, |live_kit| {
725            !matches!(live_kit.screen_track, ScreenTrack::None)
726        })
727    }
728
729    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
730        if self.status.is_offline() {
731            return Task::ready(Err(anyhow!("room is offline")));
732        } else if self.is_screen_sharing() {
733            return Task::ready(Err(anyhow!("screen was already shared")));
734        }
735
736        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
737            let publish_id = post_inc(&mut live_kit.next_publish_id);
738            live_kit.screen_track = ScreenTrack::Pending { publish_id };
739            cx.notify();
740            (live_kit.room.display_sources(), publish_id)
741        } else {
742            return Task::ready(Err(anyhow!("live-kit was not initialized")));
743        };
744
745        cx.spawn_weak(|this, mut cx| async move {
746            let publish_track = async {
747                let displays = displays.await?;
748                let display = displays
749                    .first()
750                    .ok_or_else(|| anyhow!("no display found"))?;
751                let track = LocalVideoTrack::screen_share_for_display(&display);
752                this.upgrade(&cx)
753                    .ok_or_else(|| anyhow!("room was dropped"))?
754                    .read_with(&cx, |this, _| {
755                        this.live_kit
756                            .as_ref()
757                            .map(|live_kit| live_kit.room.publish_video_track(&track))
758                    })
759                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
760                    .await
761            };
762
763            let publication = publish_track.await;
764            this.upgrade(&cx)
765                .ok_or_else(|| anyhow!("room was dropped"))?
766                .update(&mut cx, |this, cx| {
767                    let live_kit = this
768                        .live_kit
769                        .as_mut()
770                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
771
772                    let canceled = if let ScreenTrack::Pending {
773                        publish_id: cur_publish_id,
774                    } = &live_kit.screen_track
775                    {
776                        *cur_publish_id != publish_id
777                    } else {
778                        true
779                    };
780
781                    match publication {
782                        Ok(publication) => {
783                            if canceled {
784                                live_kit.room.unpublish_track(publication);
785                            } else {
786                                live_kit.screen_track = ScreenTrack::Published(publication);
787                                cx.notify();
788                            }
789                            Ok(())
790                        }
791                        Err(error) => {
792                            if canceled {
793                                Ok(())
794                            } else {
795                                live_kit.screen_track = ScreenTrack::None;
796                                cx.notify();
797                                Err(error)
798                            }
799                        }
800                    }
801                })
802        })
803    }
804
805    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
806        if self.status.is_offline() {
807            return Err(anyhow!("room is offline"));
808        }
809
810        let live_kit = self
811            .live_kit
812            .as_mut()
813            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
814        match mem::take(&mut live_kit.screen_track) {
815            ScreenTrack::None => Err(anyhow!("screen was not shared")),
816            ScreenTrack::Pending { .. } => {
817                cx.notify();
818                Ok(())
819            }
820            ScreenTrack::Published(track) => {
821                live_kit.room.unpublish_track(track);
822                cx.notify();
823                Ok(())
824            }
825        }
826    }
827
828    #[cfg(any(test, feature = "test-support"))]
829    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
830        self.live_kit
831            .as_ref()
832            .unwrap()
833            .room
834            .set_display_sources(sources);
835    }
836}
837
838struct LiveKitRoom {
839    room: Arc<live_kit_client::Room>,
840    screen_track: ScreenTrack,
841    next_publish_id: usize,
842    _maintain_room: Task<()>,
843    _maintain_tracks: Task<()>,
844}
845
846enum ScreenTrack {
847    None,
848    Pending { publish_id: usize },
849    Published(LocalTrackPublication),
850}
851
852impl Default for ScreenTrack {
853    fn default() -> Self {
854        Self::None
855    }
856}
857
858#[derive(Copy, Clone, PartialEq, Eq)]
859pub enum RoomStatus {
860    Online,
861    Rejoining,
862    Offline,
863}
864
865impl RoomStatus {
866    pub fn is_offline(&self) -> bool {
867        matches!(self, RoomStatus::Offline)
868    }
869
870    pub fn is_online(&self) -> bool {
871        matches!(self, RoomStatus::Online)
872    }
873}