room.rs

  1use crate::{
  2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{BTreeMap, HashSet};
  8use futures::StreamExt;
  9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
 10use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
 11use postage::watch;
 12use project::Project;
 13use std::{os::unix::prelude::OsStrExt, sync::Arc};
 14use util::ResultExt;
 15
 16#[derive(Clone, Debug, PartialEq, Eq)]
 17pub enum Event {
 18    Frame {
 19        participant_id: PeerId,
 20        track_id: live_kit_client::Sid,
 21    },
 22    RemoteProjectShared {
 23        owner: Arc<User>,
 24        project_id: u64,
 25        worktree_root_names: Vec<String>,
 26    },
 27    RemoteProjectUnshared {
 28        project_id: u64,
 29    },
 30    Left,
 31}
 32
 33pub struct Room {
 34    id: u64,
 35    live_kit: Option<LiveKitRoom>,
 36    status: RoomStatus,
 37    local_participant: LocalParticipant,
 38    remote_participants: BTreeMap<PeerId, RemoteParticipant>,
 39    pending_participants: Vec<Arc<User>>,
 40    participant_user_ids: HashSet<u64>,
 41    pending_call_count: usize,
 42    leave_when_empty: bool,
 43    client: Arc<Client>,
 44    user_store: ModelHandle<UserStore>,
 45    subscriptions: Vec<client::Subscription>,
 46    pending_room_update: Option<Task<()>>,
 47}
 48
 49impl Entity for Room {
 50    type Event = Event;
 51
 52    fn release(&mut self, _: &mut MutableAppContext) {
 53        if self.status.is_online() {
 54            self.client.send(proto::LeaveRoom { id: self.id }).log_err();
 55        }
 56    }
 57}
 58
 59impl Room {
 60    fn new(
 61        id: u64,
 62        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 63        client: Arc<Client>,
 64        user_store: ModelHandle<UserStore>,
 65        cx: &mut ModelContext<Self>,
 66    ) -> Self {
 67        let mut client_status = client.status();
 68        cx.spawn_weak(|this, mut cx| async move {
 69            let is_connected = client_status
 70                .next()
 71                .await
 72                .map_or(false, |s| s.is_connected());
 73            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 74            if !is_connected || client_status.next().await.is_some() {
 75                if let Some(this) = this.upgrade(&cx) {
 76                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 77                }
 78            }
 79        })
 80        .detach();
 81
 82        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 83            let room = live_kit_client::Room::new();
 84            let mut track_changes = room.remote_video_track_updates();
 85            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 86                while let Some(track_change) = track_changes.next().await {
 87                    let this = if let Some(this) = this.upgrade(&cx) {
 88                        this
 89                    } else {
 90                        break;
 91                    };
 92
 93                    this.update(&mut cx, |this, cx| {
 94                        this.remote_video_track_updated(track_change, cx).log_err()
 95                    });
 96                }
 97            });
 98            cx.foreground()
 99                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
100                .detach_and_log_err(cx);
101            Some(LiveKitRoom {
102                room,
103                screen_track: None,
104                _maintain_room,
105            })
106        } else {
107            None
108        };
109
110        Self {
111            id,
112            live_kit: live_kit_room,
113            status: RoomStatus::Online,
114            participant_user_ids: Default::default(),
115            local_participant: Default::default(),
116            remote_participants: Default::default(),
117            pending_participants: Default::default(),
118            pending_call_count: 0,
119            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
120            leave_when_empty: false,
121            pending_room_update: None,
122            client,
123            user_store,
124        }
125    }
126
127    pub(crate) fn create(
128        recipient_user_id: u64,
129        initial_project: Option<ModelHandle<Project>>,
130        client: Arc<Client>,
131        user_store: ModelHandle<UserStore>,
132        cx: &mut MutableAppContext,
133    ) -> Task<Result<ModelHandle<Self>>> {
134        cx.spawn(|mut cx| async move {
135            let response = client.request(proto::CreateRoom {}).await?;
136            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
137            let room = cx.add_model(|cx| {
138                Self::new(
139                    room_proto.id,
140                    response.live_kit_connection_info,
141                    client,
142                    user_store,
143                    cx,
144                )
145            });
146
147            let initial_project_id = if let Some(initial_project) = initial_project {
148                let initial_project_id = room
149                    .update(&mut cx, |room, cx| {
150                        room.share_project(initial_project.clone(), cx)
151                    })
152                    .await?;
153                Some(initial_project_id)
154            } else {
155                None
156            };
157
158            match room
159                .update(&mut cx, |room, cx| {
160                    room.leave_when_empty = true;
161                    room.call(recipient_user_id, initial_project_id, cx)
162                })
163                .await
164            {
165                Ok(()) => Ok(room),
166                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
167            }
168        })
169    }
170
171    pub(crate) fn join(
172        call: &IncomingCall,
173        client: Arc<Client>,
174        user_store: ModelHandle<UserStore>,
175        cx: &mut MutableAppContext,
176    ) -> Task<Result<ModelHandle<Self>>> {
177        let room_id = call.room_id;
178        cx.spawn(|mut cx| async move {
179            let response = client.request(proto::JoinRoom { id: room_id }).await?;
180            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
181            let room = cx.add_model(|cx| {
182                Self::new(
183                    room_id,
184                    response.live_kit_connection_info,
185                    client,
186                    user_store,
187                    cx,
188                )
189            });
190            room.update(&mut cx, |room, cx| {
191                room.leave_when_empty = true;
192                room.apply_room_update(room_proto, cx)?;
193                anyhow::Ok(())
194            })?;
195            Ok(room)
196        })
197    }
198
199    fn should_leave(&self) -> bool {
200        self.leave_when_empty
201            && self.pending_room_update.is_none()
202            && self.pending_participants.is_empty()
203            && self.remote_participants.is_empty()
204            && self.pending_call_count == 0
205    }
206
207    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
208        if self.status.is_offline() {
209            return Err(anyhow!("room is offline"));
210        }
211
212        cx.notify();
213        cx.emit(Event::Left);
214        self.status = RoomStatus::Offline;
215        self.remote_participants.clear();
216        self.pending_participants.clear();
217        self.participant_user_ids.clear();
218        self.subscriptions.clear();
219        self.live_kit.take();
220        self.client.send(proto::LeaveRoom { id: self.id })?;
221        Ok(())
222    }
223
224    pub fn id(&self) -> u64 {
225        self.id
226    }
227
228    pub fn status(&self) -> RoomStatus {
229        self.status
230    }
231
232    pub fn local_participant(&self) -> &LocalParticipant {
233        &self.local_participant
234    }
235
236    pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
237        &self.remote_participants
238    }
239
240    pub fn pending_participants(&self) -> &[Arc<User>] {
241        &self.pending_participants
242    }
243
244    pub fn contains_participant(&self, user_id: u64) -> bool {
245        self.participant_user_ids.contains(&user_id)
246    }
247
248    async fn handle_room_updated(
249        this: ModelHandle<Self>,
250        envelope: TypedEnvelope<proto::RoomUpdated>,
251        _: Arc<Client>,
252        mut cx: AsyncAppContext,
253    ) -> Result<()> {
254        let room = envelope
255            .payload
256            .room
257            .ok_or_else(|| anyhow!("invalid room"))?;
258        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
259    }
260
261    fn apply_room_update(
262        &mut self,
263        mut room: proto::Room,
264        cx: &mut ModelContext<Self>,
265    ) -> Result<()> {
266        // Filter ourselves out from the room's participants.
267        let local_participant_ix = room
268            .participants
269            .iter()
270            .position(|participant| Some(participant.user_id) == self.client.user_id());
271        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
272
273        let remote_participant_user_ids = room
274            .participants
275            .iter()
276            .map(|p| p.user_id)
277            .collect::<Vec<_>>();
278        let (remote_participants, pending_participants) =
279            self.user_store.update(cx, move |user_store, cx| {
280                (
281                    user_store.get_users(remote_participant_user_ids, cx),
282                    user_store.get_users(room.pending_participant_user_ids, cx),
283                )
284            });
285        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
286            let (remote_participants, pending_participants) =
287                futures::join!(remote_participants, pending_participants);
288
289            this.update(&mut cx, |this, cx| {
290                this.participant_user_ids.clear();
291
292                if let Some(participant) = local_participant {
293                    this.local_participant.projects = participant.projects;
294                } else {
295                    this.local_participant.projects.clear();
296                }
297
298                if let Some(participants) = remote_participants.log_err() {
299                    for (participant, user) in room.participants.into_iter().zip(participants) {
300                        let peer_id = PeerId(participant.peer_id);
301                        this.participant_user_ids.insert(participant.user_id);
302
303                        let old_projects = this
304                            .remote_participants
305                            .get(&peer_id)
306                            .into_iter()
307                            .flat_map(|existing| &existing.projects)
308                            .map(|project| project.id)
309                            .collect::<HashSet<_>>();
310                        let new_projects = participant
311                            .projects
312                            .iter()
313                            .map(|project| project.id)
314                            .collect::<HashSet<_>>();
315
316                        for project in &participant.projects {
317                            if !old_projects.contains(&project.id) {
318                                cx.emit(Event::RemoteProjectShared {
319                                    owner: user.clone(),
320                                    project_id: project.id,
321                                    worktree_root_names: project.worktree_root_names.clone(),
322                                });
323                            }
324                        }
325
326                        for unshared_project_id in old_projects.difference(&new_projects) {
327                            cx.emit(Event::RemoteProjectUnshared {
328                                project_id: *unshared_project_id,
329                            });
330                        }
331
332                        let location = ParticipantLocation::from_proto(participant.location)
333                            .unwrap_or(ParticipantLocation::External);
334                        if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id)
335                        {
336                            remote_participant.projects = participant.projects;
337                            remote_participant.location = location;
338                        } else {
339                            this.remote_participants.insert(
340                                peer_id,
341                                RemoteParticipant {
342                                    user: user.clone(),
343                                    projects: participant.projects,
344                                    location,
345                                    tracks: Default::default(),
346                                },
347                            );
348
349                            if let Some(live_kit) = this.live_kit.as_ref() {
350                                let tracks =
351                                    live_kit.room.remote_video_tracks(&peer_id.0.to_string());
352                                for track in tracks {
353                                    this.remote_video_track_updated(
354                                        RemoteVideoTrackUpdate::Subscribed(track),
355                                        cx,
356                                    )
357                                    .log_err();
358                                }
359                            }
360                        }
361                    }
362
363                    this.remote_participants.retain(|_, participant| {
364                        if this.participant_user_ids.contains(&participant.user.id) {
365                            true
366                        } else {
367                            for project in &participant.projects {
368                                cx.emit(Event::RemoteProjectUnshared {
369                                    project_id: project.id,
370                                });
371                            }
372                            false
373                        }
374                    });
375                }
376
377                if let Some(pending_participants) = pending_participants.log_err() {
378                    this.pending_participants = pending_participants;
379                    for participant in &this.pending_participants {
380                        this.participant_user_ids.insert(participant.id);
381                    }
382                }
383
384                this.pending_room_update.take();
385                if this.should_leave() {
386                    let _ = this.leave(cx);
387                }
388
389                this.check_invariants();
390                cx.notify();
391            });
392        }));
393
394        cx.notify();
395        Ok(())
396    }
397
398    fn remote_video_track_updated(
399        &mut self,
400        change: RemoteVideoTrackUpdate,
401        cx: &mut ModelContext<Self>,
402    ) -> Result<()> {
403        match change {
404            RemoteVideoTrackUpdate::Subscribed(track) => {
405                let peer_id = PeerId(track.publisher_id().parse()?);
406                let track_id = track.sid().to_string();
407                let participant = self
408                    .remote_participants
409                    .get_mut(&peer_id)
410                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
411                let (mut tx, mut rx) = watch::channel();
412                track.add_renderer(move |frame| *tx.borrow_mut() = Some(frame));
413                participant.tracks.insert(
414                    track_id.clone(),
415                    RemoteVideoTrack {
416                        frame: None,
417                        _live_kit_track: track,
418                        _maintain_frame: Arc::new(cx.spawn_weak(|this, mut cx| async move {
419                            while let Some(frame) = rx.next().await {
420                                let this = if let Some(this) = this.upgrade(&cx) {
421                                    this
422                                } else {
423                                    break;
424                                };
425
426                                let done = this.update(&mut cx, |this, cx| {
427                                    if let Some(track) =
428                                        this.remote_participants.get_mut(&peer_id).and_then(
429                                            |participant| participant.tracks.get_mut(&track_id),
430                                        )
431                                    {
432                                        track.frame = frame;
433                                        cx.emit(Event::Frame {
434                                            participant_id: peer_id,
435                                            track_id: track_id.clone(),
436                                        });
437                                        false
438                                    } else {
439                                        true
440                                    }
441                                });
442
443                                if done {
444                                    break;
445                                }
446                            }
447                        })),
448                    },
449                );
450            }
451            RemoteVideoTrackUpdate::Unsubscribed {
452                publisher_id,
453                track_id,
454            } => {
455                let peer_id = PeerId(publisher_id.parse()?);
456                let participant = self
457                    .remote_participants
458                    .get_mut(&peer_id)
459                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
460                participant.tracks.remove(&track_id);
461            }
462        }
463
464        cx.notify();
465        Ok(())
466    }
467
468    fn check_invariants(&self) {
469        #[cfg(any(test, feature = "test-support"))]
470        {
471            for participant in self.remote_participants.values() {
472                assert!(self.participant_user_ids.contains(&participant.user.id));
473            }
474
475            for participant in &self.pending_participants {
476                assert!(self.participant_user_ids.contains(&participant.id));
477            }
478
479            assert_eq!(
480                self.participant_user_ids.len(),
481                self.remote_participants.len() + self.pending_participants.len()
482            );
483        }
484    }
485
486    pub(crate) fn call(
487        &mut self,
488        recipient_user_id: u64,
489        initial_project_id: Option<u64>,
490        cx: &mut ModelContext<Self>,
491    ) -> Task<Result<()>> {
492        if self.status.is_offline() {
493            return Task::ready(Err(anyhow!("room is offline")));
494        }
495
496        cx.notify();
497        let client = self.client.clone();
498        let room_id = self.id;
499        self.pending_call_count += 1;
500        cx.spawn(|this, mut cx| async move {
501            let result = client
502                .request(proto::Call {
503                    room_id,
504                    recipient_user_id,
505                    initial_project_id,
506                })
507                .await;
508            this.update(&mut cx, |this, cx| {
509                this.pending_call_count -= 1;
510                if this.should_leave() {
511                    this.leave(cx)?;
512                }
513                result
514            })?;
515            Ok(())
516        })
517    }
518
519    pub(crate) fn share_project(
520        &mut self,
521        project: ModelHandle<Project>,
522        cx: &mut ModelContext<Self>,
523    ) -> Task<Result<u64>> {
524        if let Some(project_id) = project.read(cx).remote_id() {
525            return Task::ready(Ok(project_id));
526        }
527
528        let request = self.client.request(proto::ShareProject {
529            room_id: self.id(),
530            worktrees: project
531                .read(cx)
532                .worktrees(cx)
533                .map(|worktree| {
534                    let worktree = worktree.read(cx);
535                    proto::WorktreeMetadata {
536                        id: worktree.id().to_proto(),
537                        root_name: worktree.root_name().into(),
538                        visible: worktree.is_visible(),
539                        abs_path: worktree.abs_path().as_os_str().as_bytes().to_vec(),
540                    }
541                })
542                .collect(),
543        });
544        cx.spawn(|this, mut cx| async move {
545            let response = request.await?;
546
547            project.update(&mut cx, |project, cx| {
548                project
549                    .shared(response.project_id, cx)
550                    .detach_and_log_err(cx)
551            });
552
553            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
554            this.update(&mut cx, |this, cx| {
555                let active_project = this.local_participant.active_project.as_ref();
556                if active_project.map_or(false, |location| *location == project) {
557                    this.set_location(Some(&project), cx)
558                } else {
559                    Task::ready(Ok(()))
560                }
561            })
562            .await?;
563
564            Ok(response.project_id)
565        })
566    }
567
568    pub fn set_location(
569        &mut self,
570        project: Option<&ModelHandle<Project>>,
571        cx: &mut ModelContext<Self>,
572    ) -> Task<Result<()>> {
573        if self.status.is_offline() {
574            return Task::ready(Err(anyhow!("room is offline")));
575        }
576
577        let client = self.client.clone();
578        let room_id = self.id;
579        let location = if let Some(project) = project {
580            self.local_participant.active_project = Some(project.downgrade());
581            if let Some(project_id) = project.read(cx).remote_id() {
582                proto::participant_location::Variant::SharedProject(
583                    proto::participant_location::SharedProject { id: project_id },
584                )
585            } else {
586                proto::participant_location::Variant::UnsharedProject(
587                    proto::participant_location::UnsharedProject {},
588                )
589            }
590        } else {
591            self.local_participant.active_project = None;
592            proto::participant_location::Variant::External(proto::participant_location::External {})
593        };
594
595        cx.notify();
596        cx.foreground().spawn(async move {
597            client
598                .request(proto::UpdateParticipantLocation {
599                    room_id,
600                    location: Some(proto::ParticipantLocation {
601                        variant: Some(location),
602                    }),
603                })
604                .await?;
605            Ok(())
606        })
607    }
608
609    pub fn is_screen_sharing(&self) -> bool {
610        self.live_kit
611            .as_ref()
612            .map_or(false, |live_kit| live_kit.screen_track.is_some())
613    }
614
615    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
616        if self.status.is_offline() {
617            return Task::ready(Err(anyhow!("room is offline")));
618        }
619
620        cx.spawn_weak(|this, mut cx| async move {
621            let displays = live_kit_client::display_sources().await?;
622            let display = displays
623                .first()
624                .ok_or_else(|| anyhow!("no display found"))?;
625            let track = LocalVideoTrack::screen_share_for_display(&display);
626            let publication = this
627                .upgrade(&cx)
628                .ok_or_else(|| anyhow!("room was dropped"))?
629                .read_with(&cx, |this, _| {
630                    this.live_kit
631                        .as_ref()
632                        .map(|live_kit| live_kit.room.publish_video_track(&track))
633                })
634                .ok_or_else(|| anyhow!("live-kit was not initialized"))?
635                .await?;
636
637            this.upgrade(&cx)
638                .ok_or_else(|| anyhow!("room was dropped"))?
639                .update(&mut cx, |this, cx| {
640                    let live_kit = this
641                        .live_kit
642                        .as_mut()
643                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
644                    live_kit.screen_track = Some(publication);
645                    cx.notify();
646                    Ok(())
647                })
648        })
649    }
650
651    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
652        if self.status.is_offline() {
653            return Err(anyhow!("room is offline"));
654        }
655
656        let live_kit = self
657            .live_kit
658            .as_mut()
659            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
660        let track = live_kit
661            .screen_track
662            .take()
663            .ok_or_else(|| anyhow!("screen was not shared"))?;
664        live_kit.room.unpublish_track(track);
665        cx.notify();
666
667        Ok(())
668    }
669}
670
671struct LiveKitRoom {
672    room: Arc<live_kit_client::Room>,
673    screen_track: Option<LocalTrackPublication>,
674    _maintain_room: Task<()>,
675}
676
677#[derive(Copy, Clone, PartialEq, Eq)]
678pub enum RoomStatus {
679    Online,
680    Offline,
681}
682
683impl RoomStatus {
684    pub fn is_offline(&self) -> bool {
685        matches!(self, RoomStatus::Offline)
686    }
687
688    pub fn is_online(&self) -> bool {
689        matches!(self, RoomStatus::Online)
690    }
691}