room.rs

  1use crate::{
  2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{BTreeMap, HashSet};
  8use futures::StreamExt;
  9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
 10use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
 11use project::Project;
 12use std::{mem, os::unix::prelude::OsStrExt, sync::Arc};
 13use util::{post_inc, ResultExt};
 14
 15#[derive(Clone, Debug, PartialEq, Eq)]
 16pub enum Event {
 17    Frame {
 18        participant_id: PeerId,
 19        track_id: live_kit_client::Sid,
 20    },
 21    RemoteProjectShared {
 22        owner: Arc<User>,
 23        project_id: u64,
 24        worktree_root_names: Vec<String>,
 25    },
 26    RemoteProjectUnshared {
 27        project_id: u64,
 28    },
 29    Left,
 30}
 31
 32pub struct Room {
 33    id: u64,
 34    live_kit: Option<LiveKitRoom>,
 35    status: RoomStatus,
 36    local_participant: LocalParticipant,
 37    remote_participants: BTreeMap<PeerId, RemoteParticipant>,
 38    pending_participants: Vec<Arc<User>>,
 39    participant_user_ids: HashSet<u64>,
 40    pending_call_count: usize,
 41    leave_when_empty: bool,
 42    client: Arc<Client>,
 43    user_store: ModelHandle<UserStore>,
 44    subscriptions: Vec<client::Subscription>,
 45    pending_room_update: Option<Task<()>>,
 46}
 47
 48impl Entity for Room {
 49    type Event = Event;
 50
 51    fn release(&mut self, _: &mut MutableAppContext) {
 52        if self.status.is_online() {
 53            self.client.send(proto::LeaveRoom { id: self.id }).log_err();
 54        }
 55    }
 56}
 57
 58impl Room {
 59    fn new(
 60        id: u64,
 61        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 62        client: Arc<Client>,
 63        user_store: ModelHandle<UserStore>,
 64        cx: &mut ModelContext<Self>,
 65    ) -> Self {
 66        let mut client_status = client.status();
 67        cx.spawn_weak(|this, mut cx| async move {
 68            let is_connected = client_status
 69                .next()
 70                .await
 71                .map_or(false, |s| s.is_connected());
 72            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 73            if !is_connected || client_status.next().await.is_some() {
 74                if let Some(this) = this.upgrade(&cx) {
 75                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 76                }
 77            }
 78        })
 79        .detach();
 80
 81        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 82            let room = live_kit_client::Room::new();
 83            let mut track_changes = room.remote_video_track_updates();
 84            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 85                while let Some(track_change) = track_changes.next().await {
 86                    let this = if let Some(this) = this.upgrade(&cx) {
 87                        this
 88                    } else {
 89                        break;
 90                    };
 91
 92                    this.update(&mut cx, |this, cx| {
 93                        this.remote_video_track_updated(track_change, cx).log_err()
 94                    });
 95                }
 96            });
 97            cx.foreground()
 98                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
 99                .detach_and_log_err(cx);
100            Some(LiveKitRoom {
101                room,
102                screen_track: ScreenTrack::None,
103                next_publish_id: 0,
104                _maintain_room,
105            })
106        } else {
107            None
108        };
109
110        Self {
111            id,
112            live_kit: live_kit_room,
113            status: RoomStatus::Online,
114            participant_user_ids: Default::default(),
115            local_participant: Default::default(),
116            remote_participants: Default::default(),
117            pending_participants: Default::default(),
118            pending_call_count: 0,
119            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
120            leave_when_empty: false,
121            pending_room_update: None,
122            client,
123            user_store,
124        }
125    }
126
127    pub(crate) fn create(
128        recipient_user_id: u64,
129        initial_project: Option<ModelHandle<Project>>,
130        client: Arc<Client>,
131        user_store: ModelHandle<UserStore>,
132        cx: &mut MutableAppContext,
133    ) -> Task<Result<ModelHandle<Self>>> {
134        cx.spawn(|mut cx| async move {
135            let response = client.request(proto::CreateRoom {}).await?;
136            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
137            let room = cx.add_model(|cx| {
138                Self::new(
139                    room_proto.id,
140                    response.live_kit_connection_info,
141                    client,
142                    user_store,
143                    cx,
144                )
145            });
146
147            let initial_project_id = if let Some(initial_project) = initial_project {
148                let initial_project_id = room
149                    .update(&mut cx, |room, cx| {
150                        room.share_project(initial_project.clone(), cx)
151                    })
152                    .await?;
153                Some(initial_project_id)
154            } else {
155                None
156            };
157
158            match room
159                .update(&mut cx, |room, cx| {
160                    room.leave_when_empty = true;
161                    room.call(recipient_user_id, initial_project_id, cx)
162                })
163                .await
164            {
165                Ok(()) => Ok(room),
166                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
167            }
168        })
169    }
170
171    pub(crate) fn join(
172        call: &IncomingCall,
173        client: Arc<Client>,
174        user_store: ModelHandle<UserStore>,
175        cx: &mut MutableAppContext,
176    ) -> Task<Result<ModelHandle<Self>>> {
177        let room_id = call.room_id;
178        cx.spawn(|mut cx| async move {
179            let response = client.request(proto::JoinRoom { id: room_id }).await?;
180            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
181            let room = cx.add_model(|cx| {
182                Self::new(
183                    room_id,
184                    response.live_kit_connection_info,
185                    client,
186                    user_store,
187                    cx,
188                )
189            });
190            room.update(&mut cx, |room, cx| {
191                room.leave_when_empty = true;
192                room.apply_room_update(room_proto, cx)?;
193                anyhow::Ok(())
194            })?;
195            Ok(room)
196        })
197    }
198
199    fn should_leave(&self) -> bool {
200        self.leave_when_empty
201            && self.pending_room_update.is_none()
202            && self.pending_participants.is_empty()
203            && self.remote_participants.is_empty()
204            && self.pending_call_count == 0
205    }
206
207    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
208        if self.status.is_offline() {
209            return Err(anyhow!("room is offline"));
210        }
211
212        cx.notify();
213        cx.emit(Event::Left);
214        self.status = RoomStatus::Offline;
215        self.remote_participants.clear();
216        self.pending_participants.clear();
217        self.participant_user_ids.clear();
218        self.subscriptions.clear();
219        self.live_kit.take();
220        self.client.send(proto::LeaveRoom { id: self.id })?;
221        Ok(())
222    }
223
224    pub fn id(&self) -> u64 {
225        self.id
226    }
227
228    pub fn status(&self) -> RoomStatus {
229        self.status
230    }
231
232    pub fn local_participant(&self) -> &LocalParticipant {
233        &self.local_participant
234    }
235
236    pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
237        &self.remote_participants
238    }
239
240    pub fn pending_participants(&self) -> &[Arc<User>] {
241        &self.pending_participants
242    }
243
244    pub fn contains_participant(&self, user_id: u64) -> bool {
245        self.participant_user_ids.contains(&user_id)
246    }
247
248    async fn handle_room_updated(
249        this: ModelHandle<Self>,
250        envelope: TypedEnvelope<proto::RoomUpdated>,
251        _: Arc<Client>,
252        mut cx: AsyncAppContext,
253    ) -> Result<()> {
254        let room = envelope
255            .payload
256            .room
257            .ok_or_else(|| anyhow!("invalid room"))?;
258        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
259    }
260
261    fn apply_room_update(
262        &mut self,
263        mut room: proto::Room,
264        cx: &mut ModelContext<Self>,
265    ) -> Result<()> {
266        // Filter ourselves out from the room's participants.
267        let local_participant_ix = room
268            .participants
269            .iter()
270            .position(|participant| Some(participant.user_id) == self.client.user_id());
271        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
272
273        let remote_participant_user_ids = room
274            .participants
275            .iter()
276            .map(|p| p.user_id)
277            .collect::<Vec<_>>();
278        let (remote_participants, pending_participants) =
279            self.user_store.update(cx, move |user_store, cx| {
280                (
281                    user_store.get_users(remote_participant_user_ids, cx),
282                    user_store.get_users(room.pending_participant_user_ids, cx),
283                )
284            });
285        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
286            let (remote_participants, pending_participants) =
287                futures::join!(remote_participants, pending_participants);
288
289            this.update(&mut cx, |this, cx| {
290                this.participant_user_ids.clear();
291
292                if let Some(participant) = local_participant {
293                    this.local_participant.projects = participant.projects;
294                } else {
295                    this.local_participant.projects.clear();
296                }
297
298                if let Some(participants) = remote_participants.log_err() {
299                    for (participant, user) in room.participants.into_iter().zip(participants) {
300                        let peer_id = PeerId(participant.peer_id);
301                        this.participant_user_ids.insert(participant.user_id);
302
303                        let old_projects = this
304                            .remote_participants
305                            .get(&peer_id)
306                            .into_iter()
307                            .flat_map(|existing| &existing.projects)
308                            .map(|project| project.id)
309                            .collect::<HashSet<_>>();
310                        let new_projects = participant
311                            .projects
312                            .iter()
313                            .map(|project| project.id)
314                            .collect::<HashSet<_>>();
315
316                        for project in &participant.projects {
317                            if !old_projects.contains(&project.id) {
318                                cx.emit(Event::RemoteProjectShared {
319                                    owner: user.clone(),
320                                    project_id: project.id,
321                                    worktree_root_names: project.worktree_root_names.clone(),
322                                });
323                            }
324                        }
325
326                        for unshared_project_id in old_projects.difference(&new_projects) {
327                            cx.emit(Event::RemoteProjectUnshared {
328                                project_id: *unshared_project_id,
329                            });
330                        }
331
332                        let location = ParticipantLocation::from_proto(participant.location)
333                            .unwrap_or(ParticipantLocation::External);
334                        if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id)
335                        {
336                            remote_participant.projects = participant.projects;
337                            remote_participant.location = location;
338                        } else {
339                            this.remote_participants.insert(
340                                peer_id,
341                                RemoteParticipant {
342                                    user: user.clone(),
343                                    projects: participant.projects,
344                                    location,
345                                    tracks: Default::default(),
346                                },
347                            );
348
349                            if let Some(live_kit) = this.live_kit.as_ref() {
350                                let tracks =
351                                    live_kit.room.remote_video_tracks(&peer_id.0.to_string());
352                                for track in tracks {
353                                    this.remote_video_track_updated(
354                                        RemoteVideoTrackUpdate::Subscribed(track),
355                                        cx,
356                                    )
357                                    .log_err();
358                                }
359                            }
360                        }
361                    }
362
363                    this.remote_participants.retain(|_, participant| {
364                        if this.participant_user_ids.contains(&participant.user.id) {
365                            true
366                        } else {
367                            for project in &participant.projects {
368                                cx.emit(Event::RemoteProjectUnshared {
369                                    project_id: project.id,
370                                });
371                            }
372                            false
373                        }
374                    });
375                }
376
377                if let Some(pending_participants) = pending_participants.log_err() {
378                    this.pending_participants = pending_participants;
379                    for participant in &this.pending_participants {
380                        this.participant_user_ids.insert(participant.id);
381                    }
382                }
383
384                this.pending_room_update.take();
385                if this.should_leave() {
386                    let _ = this.leave(cx);
387                }
388
389                this.check_invariants();
390                cx.notify();
391            });
392        }));
393
394        cx.notify();
395        Ok(())
396    }
397
398    fn remote_video_track_updated(
399        &mut self,
400        change: RemoteVideoTrackUpdate,
401        cx: &mut ModelContext<Self>,
402    ) -> Result<()> {
403        match change {
404            RemoteVideoTrackUpdate::Subscribed(track) => {
405                let peer_id = PeerId(track.publisher_id().parse()?);
406                let track_id = track.sid().to_string();
407                let participant = self
408                    .remote_participants
409                    .get_mut(&peer_id)
410                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
411                let mut frames = track.frames();
412                participant.tracks.insert(
413                    track_id.clone(),
414                    RemoteVideoTrack {
415                        frame: None,
416                        _live_kit_track: track,
417                        _maintain_frame: Arc::new(cx.spawn_weak(|this, mut cx| async move {
418                            while let Some(frame) = frames.next().await {
419                                let this = if let Some(this) = this.upgrade(&cx) {
420                                    this
421                                } else {
422                                    break;
423                                };
424
425                                let done = this.update(&mut cx, |this, cx| {
426                                    if let Some(track) =
427                                        this.remote_participants.get_mut(&peer_id).and_then(
428                                            |participant| participant.tracks.get_mut(&track_id),
429                                        )
430                                    {
431                                        track.frame = Some(frame);
432                                        cx.emit(Event::Frame {
433                                            participant_id: peer_id,
434                                            track_id: track_id.clone(),
435                                        });
436                                        false
437                                    } else {
438                                        true
439                                    }
440                                });
441
442                                if done {
443                                    break;
444                                }
445                            }
446                        })),
447                    },
448                );
449            }
450            RemoteVideoTrackUpdate::Unsubscribed {
451                publisher_id,
452                track_id,
453            } => {
454                let peer_id = PeerId(publisher_id.parse()?);
455                let participant = self
456                    .remote_participants
457                    .get_mut(&peer_id)
458                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
459                participant.tracks.remove(&track_id);
460            }
461        }
462
463        cx.notify();
464        Ok(())
465    }
466
467    fn check_invariants(&self) {
468        #[cfg(any(test, feature = "test-support"))]
469        {
470            for participant in self.remote_participants.values() {
471                assert!(self.participant_user_ids.contains(&participant.user.id));
472            }
473
474            for participant in &self.pending_participants {
475                assert!(self.participant_user_ids.contains(&participant.id));
476            }
477
478            assert_eq!(
479                self.participant_user_ids.len(),
480                self.remote_participants.len() + self.pending_participants.len()
481            );
482        }
483    }
484
485    pub(crate) fn call(
486        &mut self,
487        recipient_user_id: u64,
488        initial_project_id: Option<u64>,
489        cx: &mut ModelContext<Self>,
490    ) -> Task<Result<()>> {
491        if self.status.is_offline() {
492            return Task::ready(Err(anyhow!("room is offline")));
493        }
494
495        cx.notify();
496        let client = self.client.clone();
497        let room_id = self.id;
498        self.pending_call_count += 1;
499        cx.spawn(|this, mut cx| async move {
500            let result = client
501                .request(proto::Call {
502                    room_id,
503                    recipient_user_id,
504                    initial_project_id,
505                })
506                .await;
507            this.update(&mut cx, |this, cx| {
508                this.pending_call_count -= 1;
509                if this.should_leave() {
510                    this.leave(cx)?;
511                }
512                result
513            })?;
514            Ok(())
515        })
516    }
517
518    pub(crate) fn share_project(
519        &mut self,
520        project: ModelHandle<Project>,
521        cx: &mut ModelContext<Self>,
522    ) -> Task<Result<u64>> {
523        if let Some(project_id) = project.read(cx).remote_id() {
524            return Task::ready(Ok(project_id));
525        }
526
527        let request = self.client.request(proto::ShareProject {
528            room_id: self.id(),
529            worktrees: project
530                .read(cx)
531                .worktrees(cx)
532                .map(|worktree| {
533                    let worktree = worktree.read(cx);
534                    proto::WorktreeMetadata {
535                        id: worktree.id().to_proto(),
536                        root_name: worktree.root_name().into(),
537                        visible: worktree.is_visible(),
538                        abs_path: worktree.abs_path().as_os_str().as_bytes().to_vec(),
539                    }
540                })
541                .collect(),
542        });
543        cx.spawn(|this, mut cx| async move {
544            let response = request.await?;
545
546            project.update(&mut cx, |project, cx| {
547                project
548                    .shared(response.project_id, cx)
549                    .detach_and_log_err(cx)
550            });
551
552            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
553            this.update(&mut cx, |this, cx| {
554                let active_project = this.local_participant.active_project.as_ref();
555                if active_project.map_or(false, |location| *location == project) {
556                    this.set_location(Some(&project), cx)
557                } else {
558                    Task::ready(Ok(()))
559                }
560            })
561            .await?;
562
563            Ok(response.project_id)
564        })
565    }
566
567    pub fn set_location(
568        &mut self,
569        project: Option<&ModelHandle<Project>>,
570        cx: &mut ModelContext<Self>,
571    ) -> Task<Result<()>> {
572        if self.status.is_offline() {
573            return Task::ready(Err(anyhow!("room is offline")));
574        }
575
576        let client = self.client.clone();
577        let room_id = self.id;
578        let location = if let Some(project) = project {
579            self.local_participant.active_project = Some(project.downgrade());
580            if let Some(project_id) = project.read(cx).remote_id() {
581                proto::participant_location::Variant::SharedProject(
582                    proto::participant_location::SharedProject { id: project_id },
583                )
584            } else {
585                proto::participant_location::Variant::UnsharedProject(
586                    proto::participant_location::UnsharedProject {},
587                )
588            }
589        } else {
590            self.local_participant.active_project = None;
591            proto::participant_location::Variant::External(proto::participant_location::External {})
592        };
593
594        cx.notify();
595        cx.foreground().spawn(async move {
596            client
597                .request(proto::UpdateParticipantLocation {
598                    room_id,
599                    location: Some(proto::ParticipantLocation {
600                        variant: Some(location),
601                    }),
602                })
603                .await?;
604            Ok(())
605        })
606    }
607
608    pub fn is_screen_sharing(&self) -> bool {
609        self.live_kit.as_ref().map_or(false, |live_kit| {
610            !matches!(live_kit.screen_track, ScreenTrack::None)
611        })
612    }
613
614    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
615        if self.status.is_offline() {
616            return Task::ready(Err(anyhow!("room is offline")));
617        } else if self.is_screen_sharing() {
618            return Task::ready(Err(anyhow!("screen was already shared")));
619        }
620
621        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
622            let publish_id = post_inc(&mut live_kit.next_publish_id);
623            live_kit.screen_track = ScreenTrack::Pending { publish_id };
624            cx.notify();
625            (live_kit.room.display_sources(), publish_id)
626        } else {
627            return Task::ready(Err(anyhow!("live-kit was not initialized")));
628        };
629
630        cx.spawn_weak(|this, mut cx| async move {
631            let publish_track = async {
632                let displays = displays.await?;
633                let display = displays
634                    .first()
635                    .ok_or_else(|| anyhow!("no display found"))?;
636                let track = LocalVideoTrack::screen_share_for_display(&display);
637                this.upgrade(&cx)
638                    .ok_or_else(|| anyhow!("room was dropped"))?
639                    .read_with(&cx, |this, _| {
640                        this.live_kit
641                            .as_ref()
642                            .map(|live_kit| live_kit.room.publish_video_track(&track))
643                    })
644                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
645                    .await
646            };
647
648            let publication = publish_track.await;
649            this.upgrade(&cx)
650                .ok_or_else(|| anyhow!("room was dropped"))?
651                .update(&mut cx, |this, cx| {
652                    let live_kit = this
653                        .live_kit
654                        .as_mut()
655                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
656
657                    let canceled = if let ScreenTrack::Pending {
658                        publish_id: cur_publish_id,
659                    } = &live_kit.screen_track
660                    {
661                        *cur_publish_id != publish_id
662                    } else {
663                        true
664                    };
665
666                    match publication {
667                        Ok(publication) => {
668                            if canceled {
669                                live_kit.room.unpublish_track(publication);
670                            } else {
671                                live_kit.screen_track = ScreenTrack::Published(publication);
672                                cx.notify();
673                            }
674                            Ok(())
675                        }
676                        Err(error) => {
677                            if canceled {
678                                Ok(())
679                            } else {
680                                live_kit.screen_track = ScreenTrack::None;
681                                cx.notify();
682                                Err(error)
683                            }
684                        }
685                    }
686                })
687        })
688    }
689
690    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
691        if self.status.is_offline() {
692            return Err(anyhow!("room is offline"));
693        }
694
695        let live_kit = self
696            .live_kit
697            .as_mut()
698            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
699        match mem::take(&mut live_kit.screen_track) {
700            ScreenTrack::None => Err(anyhow!("screen was not shared")),
701            ScreenTrack::Pending { .. } => {
702                cx.notify();
703                Ok(())
704            }
705            ScreenTrack::Published(track) => {
706                live_kit.room.unpublish_track(track);
707                cx.notify();
708                Ok(())
709            }
710        }
711    }
712
713    #[cfg(any(test, feature = "test-support"))]
714    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
715        self.live_kit
716            .as_ref()
717            .unwrap()
718            .room
719            .set_display_sources(sources);
720    }
721}
722
723struct LiveKitRoom {
724    room: Arc<live_kit_client::Room>,
725    screen_track: ScreenTrack,
726    next_publish_id: usize,
727    _maintain_room: Task<()>,
728}
729
730pub enum ScreenTrack {
731    None,
732    Pending { publish_id: usize },
733    Published(LocalTrackPublication),
734}
735
736impl Default for ScreenTrack {
737    fn default() -> Self {
738        Self::None
739    }
740}
741
742#[derive(Copy, Clone, PartialEq, Eq)]
743pub enum RoomStatus {
744    Online,
745    Offline,
746}
747
748impl RoomStatus {
749    pub fn is_offline(&self) -> bool {
750        matches!(self, RoomStatus::Offline)
751    }
752
753    pub fn is_online(&self) -> bool {
754        matches!(self, RoomStatus::Online)
755    }
756}