room.rs

  1use crate::{
  2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{BTreeMap, HashSet};
  8use futures::StreamExt;
  9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
 10use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
 11use postage::stream::Stream;
 12use project::Project;
 13use std::{mem, sync::Arc};
 14use util::{post_inc, ResultExt};
 15
 16#[derive(Clone, Debug, PartialEq, Eq)]
 17pub enum Event {
 18    ParticipantLocationChanged {
 19        participant_id: PeerId,
 20    },
 21    RemoteVideoTracksChanged {
 22        participant_id: PeerId,
 23    },
 24    RemoteProjectShared {
 25        owner: Arc<User>,
 26        project_id: u64,
 27        worktree_root_names: Vec<String>,
 28    },
 29    RemoteProjectUnshared {
 30        project_id: u64,
 31    },
 32    Left,
 33}
 34
 35pub struct Room {
 36    id: u64,
 37    live_kit: Option<LiveKitRoom>,
 38    status: RoomStatus,
 39    local_participant: LocalParticipant,
 40    remote_participants: BTreeMap<PeerId, RemoteParticipant>,
 41    pending_participants: Vec<Arc<User>>,
 42    participant_user_ids: HashSet<u64>,
 43    pending_call_count: usize,
 44    leave_when_empty: bool,
 45    client: Arc<Client>,
 46    user_store: ModelHandle<UserStore>,
 47    subscriptions: Vec<client::Subscription>,
 48    pending_room_update: Option<Task<()>>,
 49}
 50
 51impl Entity for Room {
 52    type Event = Event;
 53
 54    fn release(&mut self, _: &mut MutableAppContext) {
 55        if self.status.is_online() {
 56            self.client.send(proto::LeaveRoom {}).log_err();
 57        }
 58    }
 59}
 60
 61impl Room {
 62    fn new(
 63        id: u64,
 64        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 65        client: Arc<Client>,
 66        user_store: ModelHandle<UserStore>,
 67        cx: &mut ModelContext<Self>,
 68    ) -> Self {
 69        let mut client_status = client.status();
 70        cx.spawn_weak(|this, mut cx| async move {
 71            let is_connected = client_status
 72                .next()
 73                .await
 74                .map_or(false, |s| s.is_connected());
 75            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 76            if !is_connected || client_status.next().await.is_some() {
 77                if let Some(this) = this.upgrade(&cx) {
 78                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 79                }
 80            }
 81        })
 82        .detach();
 83
 84        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 85            let room = live_kit_client::Room::new();
 86            let mut status = room.status();
 87            // Consume the initial status of the room.
 88            let _ = status.try_recv();
 89            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 90                while let Some(status) = status.next().await {
 91                    let this = if let Some(this) = this.upgrade(&cx) {
 92                        this
 93                    } else {
 94                        break;
 95                    };
 96
 97                    if status == live_kit_client::ConnectionState::Disconnected {
 98                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 99                        break;
100                    }
101                }
102            });
103
104            let mut track_changes = room.remote_video_track_updates();
105            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
106                while let Some(track_change) = track_changes.next().await {
107                    let this = if let Some(this) = this.upgrade(&cx) {
108                        this
109                    } else {
110                        break;
111                    };
112
113                    this.update(&mut cx, |this, cx| {
114                        this.remote_video_track_updated(track_change, cx).log_err()
115                    });
116                }
117            });
118
119            cx.foreground()
120                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
121                .detach_and_log_err(cx);
122
123            Some(LiveKitRoom {
124                room,
125                screen_track: ScreenTrack::None,
126                next_publish_id: 0,
127                _maintain_room,
128                _maintain_tracks,
129            })
130        } else {
131            None
132        };
133
134        Self {
135            id,
136            live_kit: live_kit_room,
137            status: RoomStatus::Online,
138            participant_user_ids: Default::default(),
139            local_participant: Default::default(),
140            remote_participants: Default::default(),
141            pending_participants: Default::default(),
142            pending_call_count: 0,
143            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
144            leave_when_empty: false,
145            pending_room_update: None,
146            client,
147            user_store,
148        }
149    }
150
151    pub(crate) fn create(
152        called_user_id: u64,
153        initial_project: Option<ModelHandle<Project>>,
154        client: Arc<Client>,
155        user_store: ModelHandle<UserStore>,
156        cx: &mut MutableAppContext,
157    ) -> Task<Result<ModelHandle<Self>>> {
158        cx.spawn(|mut cx| async move {
159            let response = client.request(proto::CreateRoom {}).await?;
160            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
161            let room = cx.add_model(|cx| {
162                Self::new(
163                    room_proto.id,
164                    response.live_kit_connection_info,
165                    client,
166                    user_store,
167                    cx,
168                )
169            });
170
171            let initial_project_id = if let Some(initial_project) = initial_project {
172                let initial_project_id = room
173                    .update(&mut cx, |room, cx| {
174                        room.share_project(initial_project.clone(), cx)
175                    })
176                    .await?;
177                Some(initial_project_id)
178            } else {
179                None
180            };
181
182            match room
183                .update(&mut cx, |room, cx| {
184                    room.leave_when_empty = true;
185                    room.call(called_user_id, initial_project_id, cx)
186                })
187                .await
188            {
189                Ok(()) => Ok(room),
190                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
191            }
192        })
193    }
194
195    pub(crate) fn join(
196        call: &IncomingCall,
197        client: Arc<Client>,
198        user_store: ModelHandle<UserStore>,
199        cx: &mut MutableAppContext,
200    ) -> Task<Result<ModelHandle<Self>>> {
201        let room_id = call.room_id;
202        cx.spawn(|mut cx| async move {
203            let response = client.request(proto::JoinRoom { id: room_id }).await?;
204            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
205            let room = cx.add_model(|cx| {
206                Self::new(
207                    room_id,
208                    response.live_kit_connection_info,
209                    client,
210                    user_store,
211                    cx,
212                )
213            });
214            room.update(&mut cx, |room, cx| {
215                room.leave_when_empty = true;
216                room.apply_room_update(room_proto, cx)?;
217                anyhow::Ok(())
218            })?;
219            Ok(room)
220        })
221    }
222
223    fn should_leave(&self) -> bool {
224        self.leave_when_empty
225            && self.pending_room_update.is_none()
226            && self.pending_participants.is_empty()
227            && self.remote_participants.is_empty()
228            && self.pending_call_count == 0
229    }
230
231    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
232        if self.status.is_offline() {
233            return Err(anyhow!("room is offline"));
234        }
235
236        cx.notify();
237        cx.emit(Event::Left);
238        self.status = RoomStatus::Offline;
239        self.remote_participants.clear();
240        self.pending_participants.clear();
241        self.participant_user_ids.clear();
242        self.subscriptions.clear();
243        self.live_kit.take();
244        self.client.send(proto::LeaveRoom {})?;
245        Ok(())
246    }
247
248    pub fn id(&self) -> u64 {
249        self.id
250    }
251
252    pub fn status(&self) -> RoomStatus {
253        self.status
254    }
255
256    pub fn local_participant(&self) -> &LocalParticipant {
257        &self.local_participant
258    }
259
260    pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
261        &self.remote_participants
262    }
263
264    pub fn pending_participants(&self) -> &[Arc<User>] {
265        &self.pending_participants
266    }
267
268    pub fn contains_participant(&self, user_id: u64) -> bool {
269        self.participant_user_ids.contains(&user_id)
270    }
271
272    async fn handle_room_updated(
273        this: ModelHandle<Self>,
274        envelope: TypedEnvelope<proto::RoomUpdated>,
275        _: Arc<Client>,
276        mut cx: AsyncAppContext,
277    ) -> Result<()> {
278        let room = envelope
279            .payload
280            .room
281            .ok_or_else(|| anyhow!("invalid room"))?;
282        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
283    }
284
285    fn apply_room_update(
286        &mut self,
287        mut room: proto::Room,
288        cx: &mut ModelContext<Self>,
289    ) -> Result<()> {
290        // Filter ourselves out from the room's participants.
291        let local_participant_ix = room
292            .participants
293            .iter()
294            .position(|participant| Some(participant.user_id) == self.client.user_id());
295        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
296
297        let pending_participant_user_ids = room
298            .pending_participants
299            .iter()
300            .map(|p| p.user_id)
301            .collect::<Vec<_>>();
302        let remote_participant_user_ids = room
303            .participants
304            .iter()
305            .map(|p| p.user_id)
306            .collect::<Vec<_>>();
307        let (remote_participants, pending_participants) =
308            self.user_store.update(cx, move |user_store, cx| {
309                (
310                    user_store.get_users(remote_participant_user_ids, cx),
311                    user_store.get_users(pending_participant_user_ids, cx),
312                )
313            });
314        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
315            let (remote_participants, pending_participants) =
316                futures::join!(remote_participants, pending_participants);
317
318            this.update(&mut cx, |this, cx| {
319                this.participant_user_ids.clear();
320
321                if let Some(participant) = local_participant {
322                    this.local_participant.projects = participant.projects;
323                } else {
324                    this.local_participant.projects.clear();
325                }
326
327                if let Some(participants) = remote_participants.log_err() {
328                    for (participant, user) in room.participants.into_iter().zip(participants) {
329                        let peer_id = PeerId(participant.peer_id);
330                        this.participant_user_ids.insert(participant.user_id);
331
332                        let old_projects = this
333                            .remote_participants
334                            .get(&peer_id)
335                            .into_iter()
336                            .flat_map(|existing| &existing.projects)
337                            .map(|project| project.id)
338                            .collect::<HashSet<_>>();
339                        let new_projects = participant
340                            .projects
341                            .iter()
342                            .map(|project| project.id)
343                            .collect::<HashSet<_>>();
344
345                        for project in &participant.projects {
346                            if !old_projects.contains(&project.id) {
347                                cx.emit(Event::RemoteProjectShared {
348                                    owner: user.clone(),
349                                    project_id: project.id,
350                                    worktree_root_names: project.worktree_root_names.clone(),
351                                });
352                            }
353                        }
354
355                        for unshared_project_id in old_projects.difference(&new_projects) {
356                            cx.emit(Event::RemoteProjectUnshared {
357                                project_id: *unshared_project_id,
358                            });
359                        }
360
361                        let location = ParticipantLocation::from_proto(participant.location)
362                            .unwrap_or(ParticipantLocation::External);
363                        if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id)
364                        {
365                            remote_participant.projects = participant.projects;
366                            if location != remote_participant.location {
367                                remote_participant.location = location;
368                                cx.emit(Event::ParticipantLocationChanged {
369                                    participant_id: peer_id,
370                                });
371                            }
372                        } else {
373                            this.remote_participants.insert(
374                                peer_id,
375                                RemoteParticipant {
376                                    user: user.clone(),
377                                    projects: participant.projects,
378                                    location,
379                                    tracks: Default::default(),
380                                },
381                            );
382
383                            if let Some(live_kit) = this.live_kit.as_ref() {
384                                let tracks =
385                                    live_kit.room.remote_video_tracks(&peer_id.0.to_string());
386                                for track in tracks {
387                                    this.remote_video_track_updated(
388                                        RemoteVideoTrackUpdate::Subscribed(track),
389                                        cx,
390                                    )
391                                    .log_err();
392                                }
393                            }
394                        }
395                    }
396
397                    this.remote_participants.retain(|_, participant| {
398                        if this.participant_user_ids.contains(&participant.user.id) {
399                            true
400                        } else {
401                            for project in &participant.projects {
402                                cx.emit(Event::RemoteProjectUnshared {
403                                    project_id: project.id,
404                                });
405                            }
406                            false
407                        }
408                    });
409                }
410
411                if let Some(pending_participants) = pending_participants.log_err() {
412                    this.pending_participants = pending_participants;
413                    for participant in &this.pending_participants {
414                        this.participant_user_ids.insert(participant.id);
415                    }
416                }
417
418                this.pending_room_update.take();
419                if this.should_leave() {
420                    let _ = this.leave(cx);
421                }
422
423                this.check_invariants();
424                cx.notify();
425            });
426        }));
427
428        cx.notify();
429        Ok(())
430    }
431
432    fn remote_video_track_updated(
433        &mut self,
434        change: RemoteVideoTrackUpdate,
435        cx: &mut ModelContext<Self>,
436    ) -> Result<()> {
437        match change {
438            RemoteVideoTrackUpdate::Subscribed(track) => {
439                let peer_id = PeerId(track.publisher_id().parse()?);
440                let track_id = track.sid().to_string();
441                let participant = self
442                    .remote_participants
443                    .get_mut(&peer_id)
444                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
445                participant.tracks.insert(
446                    track_id.clone(),
447                    Arc::new(RemoteVideoTrack {
448                        live_kit_track: track,
449                    }),
450                );
451                cx.emit(Event::RemoteVideoTracksChanged {
452                    participant_id: peer_id,
453                });
454            }
455            RemoteVideoTrackUpdate::Unsubscribed {
456                publisher_id,
457                track_id,
458            } => {
459                let peer_id = PeerId(publisher_id.parse()?);
460                let participant = self
461                    .remote_participants
462                    .get_mut(&peer_id)
463                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
464                participant.tracks.remove(&track_id);
465                cx.emit(Event::RemoteVideoTracksChanged {
466                    participant_id: peer_id,
467                });
468            }
469        }
470
471        cx.notify();
472        Ok(())
473    }
474
475    fn check_invariants(&self) {
476        #[cfg(any(test, feature = "test-support"))]
477        {
478            for participant in self.remote_participants.values() {
479                assert!(self.participant_user_ids.contains(&participant.user.id));
480            }
481
482            for participant in &self.pending_participants {
483                assert!(self.participant_user_ids.contains(&participant.id));
484            }
485
486            assert_eq!(
487                self.participant_user_ids.len(),
488                self.remote_participants.len() + self.pending_participants.len()
489            );
490        }
491    }
492
493    pub(crate) fn call(
494        &mut self,
495        called_user_id: u64,
496        initial_project_id: Option<u64>,
497        cx: &mut ModelContext<Self>,
498    ) -> Task<Result<()>> {
499        if self.status.is_offline() {
500            return Task::ready(Err(anyhow!("room is offline")));
501        }
502
503        cx.notify();
504        let client = self.client.clone();
505        let room_id = self.id;
506        self.pending_call_count += 1;
507        cx.spawn(|this, mut cx| async move {
508            let result = client
509                .request(proto::Call {
510                    room_id,
511                    called_user_id,
512                    initial_project_id,
513                })
514                .await;
515            this.update(&mut cx, |this, cx| {
516                this.pending_call_count -= 1;
517                if this.should_leave() {
518                    this.leave(cx)?;
519                }
520                result
521            })?;
522            Ok(())
523        })
524    }
525
526    pub(crate) fn share_project(
527        &mut self,
528        project: ModelHandle<Project>,
529        cx: &mut ModelContext<Self>,
530    ) -> Task<Result<u64>> {
531        if let Some(project_id) = project.read(cx).remote_id() {
532            return Task::ready(Ok(project_id));
533        }
534
535        let request = self.client.request(proto::ShareProject {
536            room_id: self.id(),
537            worktrees: project
538                .read(cx)
539                .worktrees(cx)
540                .map(|worktree| {
541                    let worktree = worktree.read(cx);
542                    proto::WorktreeMetadata {
543                        id: worktree.id().to_proto(),
544                        root_name: worktree.root_name().into(),
545                        visible: worktree.is_visible(),
546                        abs_path: worktree.abs_path().to_string_lossy().into(),
547                    }
548                })
549                .collect(),
550        });
551        cx.spawn(|this, mut cx| async move {
552            let response = request.await?;
553
554            project.update(&mut cx, |project, cx| {
555                project
556                    .shared(response.project_id, cx)
557                    .detach_and_log_err(cx)
558            });
559
560            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
561            this.update(&mut cx, |this, cx| {
562                let active_project = this.local_participant.active_project.as_ref();
563                if active_project.map_or(false, |location| *location == project) {
564                    this.set_location(Some(&project), cx)
565                } else {
566                    Task::ready(Ok(()))
567                }
568            })
569            .await?;
570
571            Ok(response.project_id)
572        })
573    }
574
575    pub(crate) fn set_location(
576        &mut self,
577        project: Option<&ModelHandle<Project>>,
578        cx: &mut ModelContext<Self>,
579    ) -> Task<Result<()>> {
580        if self.status.is_offline() {
581            return Task::ready(Err(anyhow!("room is offline")));
582        }
583
584        let client = self.client.clone();
585        let room_id = self.id;
586        let location = if let Some(project) = project {
587            self.local_participant.active_project = Some(project.downgrade());
588            if let Some(project_id) = project.read(cx).remote_id() {
589                proto::participant_location::Variant::SharedProject(
590                    proto::participant_location::SharedProject { id: project_id },
591                )
592            } else {
593                proto::participant_location::Variant::UnsharedProject(
594                    proto::participant_location::UnsharedProject {},
595                )
596            }
597        } else {
598            self.local_participant.active_project = None;
599            proto::participant_location::Variant::External(proto::participant_location::External {})
600        };
601
602        cx.notify();
603        cx.foreground().spawn(async move {
604            client
605                .request(proto::UpdateParticipantLocation {
606                    room_id,
607                    location: Some(proto::ParticipantLocation {
608                        variant: Some(location),
609                    }),
610                })
611                .await?;
612            Ok(())
613        })
614    }
615
616    pub fn is_screen_sharing(&self) -> bool {
617        self.live_kit.as_ref().map_or(false, |live_kit| {
618            !matches!(live_kit.screen_track, ScreenTrack::None)
619        })
620    }
621
622    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
623        if self.status.is_offline() {
624            return Task::ready(Err(anyhow!("room is offline")));
625        } else if self.is_screen_sharing() {
626            return Task::ready(Err(anyhow!("screen was already shared")));
627        }
628
629        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
630            let publish_id = post_inc(&mut live_kit.next_publish_id);
631            live_kit.screen_track = ScreenTrack::Pending { publish_id };
632            cx.notify();
633            (live_kit.room.display_sources(), publish_id)
634        } else {
635            return Task::ready(Err(anyhow!("live-kit was not initialized")));
636        };
637
638        cx.spawn_weak(|this, mut cx| async move {
639            let publish_track = async {
640                let displays = displays.await?;
641                let display = displays
642                    .first()
643                    .ok_or_else(|| anyhow!("no display found"))?;
644                let track = LocalVideoTrack::screen_share_for_display(&display);
645                this.upgrade(&cx)
646                    .ok_or_else(|| anyhow!("room was dropped"))?
647                    .read_with(&cx, |this, _| {
648                        this.live_kit
649                            .as_ref()
650                            .map(|live_kit| live_kit.room.publish_video_track(&track))
651                    })
652                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
653                    .await
654            };
655
656            let publication = publish_track.await;
657            this.upgrade(&cx)
658                .ok_or_else(|| anyhow!("room was dropped"))?
659                .update(&mut cx, |this, cx| {
660                    let live_kit = this
661                        .live_kit
662                        .as_mut()
663                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
664
665                    let canceled = if let ScreenTrack::Pending {
666                        publish_id: cur_publish_id,
667                    } = &live_kit.screen_track
668                    {
669                        *cur_publish_id != publish_id
670                    } else {
671                        true
672                    };
673
674                    match publication {
675                        Ok(publication) => {
676                            if canceled {
677                                live_kit.room.unpublish_track(publication);
678                            } else {
679                                live_kit.screen_track = ScreenTrack::Published(publication);
680                                cx.notify();
681                            }
682                            Ok(())
683                        }
684                        Err(error) => {
685                            if canceled {
686                                Ok(())
687                            } else {
688                                live_kit.screen_track = ScreenTrack::None;
689                                cx.notify();
690                                Err(error)
691                            }
692                        }
693                    }
694                })
695        })
696    }
697
698    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
699        if self.status.is_offline() {
700            return Err(anyhow!("room is offline"));
701        }
702
703        let live_kit = self
704            .live_kit
705            .as_mut()
706            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
707        match mem::take(&mut live_kit.screen_track) {
708            ScreenTrack::None => Err(anyhow!("screen was not shared")),
709            ScreenTrack::Pending { .. } => {
710                cx.notify();
711                Ok(())
712            }
713            ScreenTrack::Published(track) => {
714                live_kit.room.unpublish_track(track);
715                cx.notify();
716                Ok(())
717            }
718        }
719    }
720
721    #[cfg(any(test, feature = "test-support"))]
722    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
723        self.live_kit
724            .as_ref()
725            .unwrap()
726            .room
727            .set_display_sources(sources);
728    }
729}
730
731struct LiveKitRoom {
732    room: Arc<live_kit_client::Room>,
733    screen_track: ScreenTrack,
734    next_publish_id: usize,
735    _maintain_room: Task<()>,
736    _maintain_tracks: Task<()>,
737}
738
739enum ScreenTrack {
740    None,
741    Pending { publish_id: usize },
742    Published(LocalTrackPublication),
743}
744
745impl Default for ScreenTrack {
746    fn default() -> Self {
747        Self::None
748    }
749}
750
751#[derive(Copy, Clone, PartialEq, Eq)]
752pub enum RoomStatus {
753    Online,
754    Offline,
755}
756
757impl RoomStatus {
758    pub fn is_offline(&self) -> bool {
759        matches!(self, RoomStatus::Offline)
760    }
761
762    pub fn is_online(&self) -> bool {
763        matches!(self, RoomStatus::Online)
764    }
765}