room.rs

  1use crate::{
  2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{
  7    proto::{self, PeerId},
  8    Client, TypedEnvelope, User, UserStore,
  9};
 10use collections::{BTreeMap, HashMap, HashSet};
 11use fs::Fs;
 12use futures::{FutureExt, StreamExt};
 13use gpui::{
 14    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
 15};
 16use language::LanguageRegistry;
 17use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
 18use postage::stream::Stream;
 19use project::Project;
 20use std::{mem, sync::Arc, time::Duration};
 21use util::{post_inc, ResultExt, TryFutureExt};
 22
 23pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
 24
 25#[derive(Clone, Debug, PartialEq, Eq)]
 26pub enum Event {
 27    ParticipantLocationChanged {
 28        participant_id: proto::PeerId,
 29    },
 30    RemoteVideoTracksChanged {
 31        participant_id: proto::PeerId,
 32    },
 33    RemoteProjectShared {
 34        owner: Arc<User>,
 35        project_id: u64,
 36        worktree_root_names: Vec<String>,
 37    },
 38    RemoteProjectUnshared {
 39        project_id: u64,
 40    },
 41    Left,
 42}
 43
 44pub struct Room {
 45    id: u64,
 46    live_kit: Option<LiveKitRoom>,
 47    status: RoomStatus,
 48    shared_projects: HashSet<WeakModelHandle<Project>>,
 49    joined_projects: HashSet<WeakModelHandle<Project>>,
 50    local_participant: LocalParticipant,
 51    remote_participants: BTreeMap<u64, RemoteParticipant>,
 52    pending_participants: Vec<Arc<User>>,
 53    participant_user_ids: HashSet<u64>,
 54    pending_call_count: usize,
 55    leave_when_empty: bool,
 56    client: Arc<Client>,
 57    user_store: ModelHandle<UserStore>,
 58    subscriptions: Vec<client::Subscription>,
 59    pending_room_update: Option<Task<()>>,
 60    maintain_connection: Option<Task<Option<()>>>,
 61}
 62
 63impl Entity for Room {
 64    type Event = Event;
 65
 66    fn release(&mut self, _: &mut MutableAppContext) {
 67        if self.status.is_online() {
 68            log::info!("room was released, sending leave message");
 69            let _ = self.client.send(proto::LeaveRoom {});
 70        }
 71    }
 72}
 73
 74impl Room {
 75    fn new(
 76        id: u64,
 77        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 78        client: Arc<Client>,
 79        user_store: ModelHandle<UserStore>,
 80        cx: &mut ModelContext<Self>,
 81    ) -> Self {
 82        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 83            let room = live_kit_client::Room::new();
 84            let mut status = room.status();
 85            // Consume the initial status of the room.
 86            let _ = status.try_recv();
 87            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 88                while let Some(status) = status.next().await {
 89                    let this = if let Some(this) = this.upgrade(&cx) {
 90                        this
 91                    } else {
 92                        break;
 93                    };
 94
 95                    if status == live_kit_client::ConnectionState::Disconnected {
 96                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 97                        break;
 98                    }
 99                }
100            });
101
102            let mut track_changes = room.remote_video_track_updates();
103            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
104                while let Some(track_change) = track_changes.next().await {
105                    let this = if let Some(this) = this.upgrade(&cx) {
106                        this
107                    } else {
108                        break;
109                    };
110
111                    this.update(&mut cx, |this, cx| {
112                        this.remote_video_track_updated(track_change, cx).log_err()
113                    });
114                }
115            });
116
117            cx.foreground()
118                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
119                .detach_and_log_err(cx);
120
121            Some(LiveKitRoom {
122                room,
123                screen_track: ScreenTrack::None,
124                next_publish_id: 0,
125                _maintain_room,
126                _maintain_tracks,
127            })
128        } else {
129            None
130        };
131
132        let maintain_connection =
133            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
134
135        Self {
136            id,
137            live_kit: live_kit_room,
138            status: RoomStatus::Online,
139            shared_projects: Default::default(),
140            joined_projects: Default::default(),
141            participant_user_ids: Default::default(),
142            local_participant: Default::default(),
143            remote_participants: Default::default(),
144            pending_participants: Default::default(),
145            pending_call_count: 0,
146            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
147            leave_when_empty: false,
148            pending_room_update: None,
149            client,
150            user_store,
151            maintain_connection: Some(maintain_connection),
152        }
153    }
154
155    pub(crate) fn create(
156        called_user_id: u64,
157        initial_project: Option<ModelHandle<Project>>,
158        client: Arc<Client>,
159        user_store: ModelHandle<UserStore>,
160        cx: &mut MutableAppContext,
161    ) -> Task<Result<ModelHandle<Self>>> {
162        cx.spawn(|mut cx| async move {
163            let response = client.request(proto::CreateRoom {}).await?;
164            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
165            let room = cx.add_model(|cx| {
166                Self::new(
167                    room_proto.id,
168                    response.live_kit_connection_info,
169                    client,
170                    user_store,
171                    cx,
172                )
173            });
174
175            let initial_project_id = if let Some(initial_project) = initial_project {
176                let initial_project_id = room
177                    .update(&mut cx, |room, cx| {
178                        room.share_project(initial_project.clone(), cx)
179                    })
180                    .await?;
181                Some(initial_project_id)
182            } else {
183                None
184            };
185
186            match room
187                .update(&mut cx, |room, cx| {
188                    room.leave_when_empty = true;
189                    room.call(called_user_id, initial_project_id, cx)
190                })
191                .await
192            {
193                Ok(()) => Ok(room),
194                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
195            }
196        })
197    }
198
199    pub(crate) fn join(
200        call: &IncomingCall,
201        client: Arc<Client>,
202        user_store: ModelHandle<UserStore>,
203        cx: &mut MutableAppContext,
204    ) -> Task<Result<ModelHandle<Self>>> {
205        let room_id = call.room_id;
206        cx.spawn(|mut cx| async move {
207            let response = client.request(proto::JoinRoom { id: room_id }).await?;
208            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
209            let room = cx.add_model(|cx| {
210                Self::new(
211                    room_id,
212                    response.live_kit_connection_info,
213                    client,
214                    user_store,
215                    cx,
216                )
217            });
218            room.update(&mut cx, |room, cx| {
219                room.leave_when_empty = true;
220                room.apply_room_update(room_proto, cx)?;
221                anyhow::Ok(())
222            })?;
223            Ok(room)
224        })
225    }
226
227    fn should_leave(&self) -> bool {
228        self.leave_when_empty
229            && self.pending_room_update.is_none()
230            && self.pending_participants.is_empty()
231            && self.remote_participants.is_empty()
232            && self.pending_call_count == 0
233    }
234
235    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
236        if self.status.is_offline() {
237            return Err(anyhow!("room is offline"));
238        }
239
240        cx.notify();
241        cx.emit(Event::Left);
242        log::info!("leaving room");
243
244        for project in self.shared_projects.drain() {
245            if let Some(project) = project.upgrade(cx) {
246                project.update(cx, |project, cx| {
247                    project.unshare(cx).log_err();
248                });
249            }
250        }
251        for project in self.joined_projects.drain() {
252            if let Some(project) = project.upgrade(cx) {
253                project.update(cx, |project, cx| {
254                    project.disconnected_from_host(cx);
255                });
256            }
257        }
258
259        self.status = RoomStatus::Offline;
260        self.remote_participants.clear();
261        self.pending_participants.clear();
262        self.participant_user_ids.clear();
263        self.subscriptions.clear();
264        self.live_kit.take();
265        self.pending_room_update.take();
266        self.maintain_connection.take();
267        self.client.send(proto::LeaveRoom {})?;
268        Ok(())
269    }
270
271    async fn maintain_connection(
272        this: WeakModelHandle<Self>,
273        client: Arc<Client>,
274        mut cx: AsyncAppContext,
275    ) -> Result<()> {
276        let mut client_status = client.status();
277        loop {
278            let is_connected = client_status
279                .next()
280                .await
281                .map_or(false, |s| s.is_connected());
282
283            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
284            if !is_connected || client_status.next().await.is_some() {
285                log::info!("detected client disconnection");
286                this.upgrade(&cx)
287                    .ok_or_else(|| anyhow!("room was dropped"))?
288                    .update(&mut cx, |this, cx| {
289                        this.status = RoomStatus::Rejoining;
290                        cx.notify();
291                    });
292
293                // Wait for client to re-establish a connection to the server.
294                {
295                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
296                    let client_reconnection = async {
297                        let mut remaining_attempts = 3;
298                        while remaining_attempts > 0 {
299                            log::info!(
300                                "waiting for client status change, remaining attempts {}",
301                                remaining_attempts
302                            );
303                            let Some(status) = client_status.next().await else { break };
304                            if status.is_connected() {
305                                log::info!("client reconnected, attempting to rejoin room");
306
307                                let Some(this) = this.upgrade(&cx) else { break };
308                                if this
309                                    .update(&mut cx, |this, cx| this.rejoin(cx))
310                                    .await
311                                    .log_err()
312                                    .is_some()
313                                {
314                                    return true;
315                                } else {
316                                    remaining_attempts -= 1;
317                                }
318                            }
319                        }
320                        false
321                    }
322                    .fuse();
323                    futures::pin_mut!(client_reconnection);
324
325                    futures::select_biased! {
326                        reconnected = client_reconnection => {
327                            if reconnected {
328                                log::info!("successfully reconnected to room");
329                                // If we successfully joined the room, go back around the loop
330                                // waiting for future connection status changes.
331                                continue;
332                            }
333                        }
334                        _ = reconnection_timeout => {
335                            log::info!("room reconnection timeout expired");
336                        }
337                    }
338                }
339
340                // The client failed to re-establish a connection to the server
341                // or an error occurred while trying to re-join the room. Either way
342                // we leave the room and return an error.
343                if let Some(this) = this.upgrade(&cx) {
344                    log::info!("reconnection failed, leaving room");
345                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
346                }
347                return Err(anyhow!(
348                    "can't reconnect to room: client failed to re-establish connection"
349                ));
350            }
351        }
352    }
353
354    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
355        let mut projects = HashMap::default();
356        let mut reshared_projects = Vec::new();
357        let mut rejoined_projects = Vec::new();
358        self.shared_projects.retain(|project| {
359            if let Some(handle) = project.upgrade(cx) {
360                let project = handle.read(cx);
361                if let Some(project_id) = project.remote_id() {
362                    projects.insert(project_id, handle.clone());
363                    reshared_projects.push(proto::UpdateProject {
364                        project_id,
365                        worktrees: project.worktree_metadata_protos(cx),
366                    });
367                    return true;
368                }
369            }
370            false
371        });
372        self.joined_projects.retain(|project| {
373            if let Some(handle) = project.upgrade(cx) {
374                let project = handle.read(cx);
375                if let Some(project_id) = project.remote_id() {
376                    projects.insert(project_id, handle.clone());
377                    rejoined_projects.push(proto::RejoinProject {
378                        id: project_id,
379                        worktrees: project
380                            .worktrees(cx)
381                            .map(|worktree| {
382                                let worktree = worktree.read(cx);
383                                proto::RejoinWorktree {
384                                    id: worktree.id().to_proto(),
385                                    scan_id: worktree.completed_scan_id() as u64,
386                                }
387                            })
388                            .collect(),
389                    });
390                }
391                return true;
392            }
393            false
394        });
395
396        let response = self.client.request(proto::RejoinRoom {
397            id: self.id,
398            reshared_projects,
399            rejoined_projects,
400        });
401
402        cx.spawn(|this, mut cx| async move {
403            let response = response.await?;
404            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
405            this.update(&mut cx, |this, cx| {
406                this.status = RoomStatus::Online;
407                this.apply_room_update(room_proto, cx)?;
408
409                for reshared_project in response.reshared_projects {
410                    if let Some(project) = projects.get(&reshared_project.id) {
411                        project.update(cx, |project, cx| {
412                            project.reshared(reshared_project, cx).log_err();
413                        });
414                    }
415                }
416
417                for rejoined_project in response.rejoined_projects {
418                    if let Some(project) = projects.get(&rejoined_project.id) {
419                        project.update(cx, |project, cx| {
420                            project.rejoined(rejoined_project, cx).log_err();
421                        });
422                    }
423                }
424
425                anyhow::Ok(())
426            })
427        })
428    }
429
430    pub fn id(&self) -> u64 {
431        self.id
432    }
433
434    pub fn status(&self) -> RoomStatus {
435        self.status
436    }
437
438    pub fn local_participant(&self) -> &LocalParticipant {
439        &self.local_participant
440    }
441
442    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
443        &self.remote_participants
444    }
445
446    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
447        self.remote_participants
448            .values()
449            .find(|p| p.peer_id == peer_id)
450    }
451
452    pub fn pending_participants(&self) -> &[Arc<User>] {
453        &self.pending_participants
454    }
455
456    pub fn contains_participant(&self, user_id: u64) -> bool {
457        self.participant_user_ids.contains(&user_id)
458    }
459
460    async fn handle_room_updated(
461        this: ModelHandle<Self>,
462        envelope: TypedEnvelope<proto::RoomUpdated>,
463        _: Arc<Client>,
464        mut cx: AsyncAppContext,
465    ) -> Result<()> {
466        let room = envelope
467            .payload
468            .room
469            .ok_or_else(|| anyhow!("invalid room"))?;
470        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
471    }
472
473    fn apply_room_update(
474        &mut self,
475        mut room: proto::Room,
476        cx: &mut ModelContext<Self>,
477    ) -> Result<()> {
478        // Filter ourselves out from the room's participants.
479        let local_participant_ix = room
480            .participants
481            .iter()
482            .position(|participant| Some(participant.user_id) == self.client.user_id());
483        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
484
485        let pending_participant_user_ids = room
486            .pending_participants
487            .iter()
488            .map(|p| p.user_id)
489            .collect::<Vec<_>>();
490        let remote_participant_user_ids = room
491            .participants
492            .iter()
493            .map(|p| p.user_id)
494            .collect::<Vec<_>>();
495        let (remote_participants, pending_participants) =
496            self.user_store.update(cx, move |user_store, cx| {
497                (
498                    user_store.get_users(remote_participant_user_ids, cx),
499                    user_store.get_users(pending_participant_user_ids, cx),
500                )
501            });
502        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
503            let (remote_participants, pending_participants) =
504                futures::join!(remote_participants, pending_participants);
505
506            this.update(&mut cx, |this, cx| {
507                this.participant_user_ids.clear();
508
509                if let Some(participant) = local_participant {
510                    this.local_participant.projects = participant.projects;
511                } else {
512                    this.local_participant.projects.clear();
513                }
514
515                if let Some(participants) = remote_participants.log_err() {
516                    for (participant, user) in room.participants.into_iter().zip(participants) {
517                        let Some(peer_id) = participant.peer_id else { continue };
518                        this.participant_user_ids.insert(participant.user_id);
519
520                        let old_projects = this
521                            .remote_participants
522                            .get(&participant.user_id)
523                            .into_iter()
524                            .flat_map(|existing| &existing.projects)
525                            .map(|project| project.id)
526                            .collect::<HashSet<_>>();
527                        let new_projects = participant
528                            .projects
529                            .iter()
530                            .map(|project| project.id)
531                            .collect::<HashSet<_>>();
532
533                        for project in &participant.projects {
534                            if !old_projects.contains(&project.id) {
535                                cx.emit(Event::RemoteProjectShared {
536                                    owner: user.clone(),
537                                    project_id: project.id,
538                                    worktree_root_names: project.worktree_root_names.clone(),
539                                });
540                            }
541                        }
542
543                        for unshared_project_id in old_projects.difference(&new_projects) {
544                            this.joined_projects.retain(|project| {
545                                if let Some(project) = project.upgrade(cx) {
546                                    project.update(cx, |project, cx| {
547                                        if project.remote_id() == Some(*unshared_project_id) {
548                                            project.disconnected_from_host(cx);
549                                            false
550                                        } else {
551                                            true
552                                        }
553                                    })
554                                } else {
555                                    false
556                                }
557                            });
558                            cx.emit(Event::RemoteProjectUnshared {
559                                project_id: *unshared_project_id,
560                            });
561                        }
562
563                        let location = ParticipantLocation::from_proto(participant.location)
564                            .unwrap_or(ParticipantLocation::External);
565                        if let Some(remote_participant) =
566                            this.remote_participants.get_mut(&participant.user_id)
567                        {
568                            remote_participant.projects = participant.projects;
569                            remote_participant.peer_id = peer_id;
570                            if location != remote_participant.location {
571                                remote_participant.location = location;
572                                cx.emit(Event::ParticipantLocationChanged {
573                                    participant_id: peer_id,
574                                });
575                            }
576                        } else {
577                            this.remote_participants.insert(
578                                participant.user_id,
579                                RemoteParticipant {
580                                    user: user.clone(),
581                                    peer_id,
582                                    projects: participant.projects,
583                                    location,
584                                    tracks: Default::default(),
585                                },
586                            );
587
588                            if let Some(live_kit) = this.live_kit.as_ref() {
589                                let tracks =
590                                    live_kit.room.remote_video_tracks(&peer_id.to_string());
591                                for track in tracks {
592                                    this.remote_video_track_updated(
593                                        RemoteVideoTrackUpdate::Subscribed(track),
594                                        cx,
595                                    )
596                                    .log_err();
597                                }
598                            }
599                        }
600                    }
601
602                    this.remote_participants.retain(|user_id, participant| {
603                        if this.participant_user_ids.contains(user_id) {
604                            true
605                        } else {
606                            for project in &participant.projects {
607                                cx.emit(Event::RemoteProjectUnshared {
608                                    project_id: project.id,
609                                });
610                            }
611                            false
612                        }
613                    });
614                }
615
616                if let Some(pending_participants) = pending_participants.log_err() {
617                    this.pending_participants = pending_participants;
618                    for participant in &this.pending_participants {
619                        this.participant_user_ids.insert(participant.id);
620                    }
621                }
622
623                this.pending_room_update.take();
624                if this.should_leave() {
625                    log::info!("room is empty, leaving");
626                    let _ = this.leave(cx);
627                }
628
629                this.check_invariants();
630                cx.notify();
631            });
632        }));
633
634        cx.notify();
635        Ok(())
636    }
637
638    fn remote_video_track_updated(
639        &mut self,
640        change: RemoteVideoTrackUpdate,
641        cx: &mut ModelContext<Self>,
642    ) -> Result<()> {
643        match change {
644            RemoteVideoTrackUpdate::Subscribed(track) => {
645                let user_id = track.publisher_id().parse()?;
646                let track_id = track.sid().to_string();
647                let participant = self
648                    .remote_participants
649                    .get_mut(&user_id)
650                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
651                participant.tracks.insert(
652                    track_id.clone(),
653                    Arc::new(RemoteVideoTrack {
654                        live_kit_track: track,
655                    }),
656                );
657                cx.emit(Event::RemoteVideoTracksChanged {
658                    participant_id: participant.peer_id,
659                });
660            }
661            RemoteVideoTrackUpdate::Unsubscribed {
662                publisher_id,
663                track_id,
664            } => {
665                let user_id = publisher_id.parse()?;
666                let participant = self
667                    .remote_participants
668                    .get_mut(&user_id)
669                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
670                participant.tracks.remove(&track_id);
671                cx.emit(Event::RemoteVideoTracksChanged {
672                    participant_id: participant.peer_id,
673                });
674            }
675        }
676
677        cx.notify();
678        Ok(())
679    }
680
681    fn check_invariants(&self) {
682        #[cfg(any(test, feature = "test-support"))]
683        {
684            for participant in self.remote_participants.values() {
685                assert!(self.participant_user_ids.contains(&participant.user.id));
686                assert_ne!(participant.user.id, self.client.user_id().unwrap());
687            }
688
689            for participant in &self.pending_participants {
690                assert!(self.participant_user_ids.contains(&participant.id));
691                assert_ne!(participant.id, self.client.user_id().unwrap());
692            }
693
694            assert_eq!(
695                self.participant_user_ids.len(),
696                self.remote_participants.len() + self.pending_participants.len()
697            );
698        }
699    }
700
701    pub(crate) fn call(
702        &mut self,
703        called_user_id: u64,
704        initial_project_id: Option<u64>,
705        cx: &mut ModelContext<Self>,
706    ) -> Task<Result<()>> {
707        if self.status.is_offline() {
708            return Task::ready(Err(anyhow!("room is offline")));
709        }
710
711        cx.notify();
712        let client = self.client.clone();
713        let room_id = self.id;
714        self.pending_call_count += 1;
715        cx.spawn(|this, mut cx| async move {
716            let result = client
717                .request(proto::Call {
718                    room_id,
719                    called_user_id,
720                    initial_project_id,
721                })
722                .await;
723            this.update(&mut cx, |this, cx| {
724                this.pending_call_count -= 1;
725                if this.should_leave() {
726                    this.leave(cx)?;
727                }
728                result
729            })?;
730            Ok(())
731        })
732    }
733
734    pub fn join_project(
735        &mut self,
736        id: u64,
737        language_registry: Arc<LanguageRegistry>,
738        fs: Arc<dyn Fs>,
739        cx: &mut ModelContext<Self>,
740    ) -> Task<Result<ModelHandle<Project>>> {
741        let client = self.client.clone();
742        let user_store = self.user_store.clone();
743        cx.spawn(|this, mut cx| async move {
744            let project =
745                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
746            this.update(&mut cx, |this, cx| {
747                this.joined_projects.retain(|project| {
748                    if let Some(project) = project.upgrade(cx) {
749                        !project.read(cx).is_read_only()
750                    } else {
751                        false
752                    }
753                });
754                this.joined_projects.insert(project.downgrade());
755            });
756            Ok(project)
757        })
758    }
759
760    pub(crate) fn share_project(
761        &mut self,
762        project: ModelHandle<Project>,
763        cx: &mut ModelContext<Self>,
764    ) -> Task<Result<u64>> {
765        if let Some(project_id) = project.read(cx).remote_id() {
766            return Task::ready(Ok(project_id));
767        }
768
769        let request = self.client.request(proto::ShareProject {
770            room_id: self.id(),
771            worktrees: project.read(cx).worktree_metadata_protos(cx),
772        });
773        cx.spawn(|this, mut cx| async move {
774            let response = request.await?;
775
776            project.update(&mut cx, |project, cx| {
777                project.shared(response.project_id, cx)
778            })?;
779
780            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
781            this.update(&mut cx, |this, cx| {
782                this.shared_projects.insert(project.downgrade());
783                let active_project = this.local_participant.active_project.as_ref();
784                if active_project.map_or(false, |location| *location == project) {
785                    this.set_location(Some(&project), cx)
786                } else {
787                    Task::ready(Ok(()))
788                }
789            })
790            .await?;
791
792            Ok(response.project_id)
793        })
794    }
795
796    pub(crate) fn set_location(
797        &mut self,
798        project: Option<&ModelHandle<Project>>,
799        cx: &mut ModelContext<Self>,
800    ) -> Task<Result<()>> {
801        if self.status.is_offline() {
802            return Task::ready(Err(anyhow!("room is offline")));
803        }
804
805        let client = self.client.clone();
806        let room_id = self.id;
807        let location = if let Some(project) = project {
808            self.local_participant.active_project = Some(project.downgrade());
809            if let Some(project_id) = project.read(cx).remote_id() {
810                proto::participant_location::Variant::SharedProject(
811                    proto::participant_location::SharedProject { id: project_id },
812                )
813            } else {
814                proto::participant_location::Variant::UnsharedProject(
815                    proto::participant_location::UnsharedProject {},
816                )
817            }
818        } else {
819            self.local_participant.active_project = None;
820            proto::participant_location::Variant::External(proto::participant_location::External {})
821        };
822
823        cx.notify();
824        cx.foreground().spawn(async move {
825            client
826                .request(proto::UpdateParticipantLocation {
827                    room_id,
828                    location: Some(proto::ParticipantLocation {
829                        variant: Some(location),
830                    }),
831                })
832                .await?;
833            Ok(())
834        })
835    }
836
837    pub fn is_screen_sharing(&self) -> bool {
838        self.live_kit.as_ref().map_or(false, |live_kit| {
839            !matches!(live_kit.screen_track, ScreenTrack::None)
840        })
841    }
842
843    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
844        if self.status.is_offline() {
845            return Task::ready(Err(anyhow!("room is offline")));
846        } else if self.is_screen_sharing() {
847            return Task::ready(Err(anyhow!("screen was already shared")));
848        }
849
850        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
851            let publish_id = post_inc(&mut live_kit.next_publish_id);
852            live_kit.screen_track = ScreenTrack::Pending { publish_id };
853            cx.notify();
854            (live_kit.room.display_sources(), publish_id)
855        } else {
856            return Task::ready(Err(anyhow!("live-kit was not initialized")));
857        };
858
859        cx.spawn_weak(|this, mut cx| async move {
860            let publish_track = async {
861                let displays = displays.await?;
862                let display = displays
863                    .first()
864                    .ok_or_else(|| anyhow!("no display found"))?;
865                let track = LocalVideoTrack::screen_share_for_display(&display);
866                this.upgrade(&cx)
867                    .ok_or_else(|| anyhow!("room was dropped"))?
868                    .read_with(&cx, |this, _| {
869                        this.live_kit
870                            .as_ref()
871                            .map(|live_kit| live_kit.room.publish_video_track(&track))
872                    })
873                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
874                    .await
875            };
876
877            let publication = publish_track.await;
878            this.upgrade(&cx)
879                .ok_or_else(|| anyhow!("room was dropped"))?
880                .update(&mut cx, |this, cx| {
881                    let live_kit = this
882                        .live_kit
883                        .as_mut()
884                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
885
886                    let canceled = if let ScreenTrack::Pending {
887                        publish_id: cur_publish_id,
888                    } = &live_kit.screen_track
889                    {
890                        *cur_publish_id != publish_id
891                    } else {
892                        true
893                    };
894
895                    match publication {
896                        Ok(publication) => {
897                            if canceled {
898                                live_kit.room.unpublish_track(publication);
899                            } else {
900                                live_kit.screen_track = ScreenTrack::Published(publication);
901                                cx.notify();
902                            }
903                            Ok(())
904                        }
905                        Err(error) => {
906                            if canceled {
907                                Ok(())
908                            } else {
909                                live_kit.screen_track = ScreenTrack::None;
910                                cx.notify();
911                                Err(error)
912                            }
913                        }
914                    }
915                })
916        })
917    }
918
919    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
920        if self.status.is_offline() {
921            return Err(anyhow!("room is offline"));
922        }
923
924        let live_kit = self
925            .live_kit
926            .as_mut()
927            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
928        match mem::take(&mut live_kit.screen_track) {
929            ScreenTrack::None => Err(anyhow!("screen was not shared")),
930            ScreenTrack::Pending { .. } => {
931                cx.notify();
932                Ok(())
933            }
934            ScreenTrack::Published(track) => {
935                live_kit.room.unpublish_track(track);
936                cx.notify();
937                Ok(())
938            }
939        }
940    }
941
942    #[cfg(any(test, feature = "test-support"))]
943    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
944        self.live_kit
945            .as_ref()
946            .unwrap()
947            .room
948            .set_display_sources(sources);
949    }
950}
951
952struct LiveKitRoom {
953    room: Arc<live_kit_client::Room>,
954    screen_track: ScreenTrack,
955    next_publish_id: usize,
956    _maintain_room: Task<()>,
957    _maintain_tracks: Task<()>,
958}
959
960enum ScreenTrack {
961    None,
962    Pending { publish_id: usize },
963    Published(LocalTrackPublication),
964}
965
966impl Default for ScreenTrack {
967    fn default() -> Self {
968        Self::None
969    }
970}
971
972#[derive(Copy, Clone, PartialEq, Eq)]
973pub enum RoomStatus {
974    Online,
975    Rejoining,
976    Offline,
977}
978
979impl RoomStatus {
980    pub fn is_offline(&self) -> bool {
981        matches!(self, RoomStatus::Offline)
982    }
983
984    pub fn is_online(&self) -> bool {
985        matches!(self, RoomStatus::Online)
986    }
987}