room.rs

  1use crate::{
  2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{BTreeMap, HashSet};
  8use futures::StreamExt;
  9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
 10use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
 11use postage::stream::Stream;
 12use project::Project;
 13use std::{mem, os::unix::prelude::OsStrExt, sync::Arc};
 14use util::{post_inc, ResultExt};
 15
 16#[derive(Clone, Debug, PartialEq, Eq)]
 17pub enum Event {
 18    ParticipantLocationChanged {
 19        participant_id: PeerId,
 20    },
 21    RemoteVideoTracksChanged {
 22        participant_id: PeerId,
 23    },
 24    RemoteProjectShared {
 25        owner: Arc<User>,
 26        project_id: u64,
 27        worktree_root_names: Vec<String>,
 28    },
 29    RemoteProjectUnshared {
 30        project_id: u64,
 31    },
 32    Left,
 33}
 34
 35pub struct Room {
 36    id: u64,
 37    live_kit: Option<LiveKitRoom>,
 38    status: RoomStatus,
 39    local_participant: LocalParticipant,
 40    remote_participants: BTreeMap<PeerId, RemoteParticipant>,
 41    pending_participants: Vec<Arc<User>>,
 42    participant_user_ids: HashSet<u64>,
 43    pending_call_count: usize,
 44    leave_when_empty: bool,
 45    client: Arc<Client>,
 46    user_store: ModelHandle<UserStore>,
 47    subscriptions: Vec<client::Subscription>,
 48    pending_room_update: Option<Task<()>>,
 49}
 50
 51impl Entity for Room {
 52    type Event = Event;
 53
 54    fn release(&mut self, _: &mut MutableAppContext) {
 55        if self.status.is_online() {
 56            self.client.send(proto::LeaveRoom { id: self.id }).log_err();
 57        }
 58    }
 59}
 60
 61impl Room {
 62    fn new(
 63        id: u64,
 64        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 65        client: Arc<Client>,
 66        user_store: ModelHandle<UserStore>,
 67        cx: &mut ModelContext<Self>,
 68    ) -> Self {
 69        let mut client_status = client.status();
 70        cx.spawn_weak(|this, mut cx| async move {
 71            let is_connected = client_status
 72                .next()
 73                .await
 74                .map_or(false, |s| s.is_connected());
 75            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 76            if !is_connected || client_status.next().await.is_some() {
 77                if let Some(this) = this.upgrade(&cx) {
 78                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 79                }
 80            }
 81        })
 82        .detach();
 83
 84        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 85            let room = live_kit_client::Room::new();
 86            let mut status = room.status();
 87            // Consume the initial status of the room.
 88            let _ = status.try_recv();
 89            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 90                while let Some(status) = status.next().await {
 91                    let this = if let Some(this) = this.upgrade(&cx) {
 92                        this
 93                    } else {
 94                        break;
 95                    };
 96
 97                    if status == live_kit_client::ConnectionState::Disconnected {
 98                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 99                        break;
100                    }
101                }
102            });
103
104            let mut track_changes = room.remote_video_track_updates();
105            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
106                while let Some(track_change) = track_changes.next().await {
107                    let this = if let Some(this) = this.upgrade(&cx) {
108                        this
109                    } else {
110                        break;
111                    };
112
113                    this.update(&mut cx, |this, cx| {
114                        this.remote_video_track_updated(track_change, cx).log_err()
115                    });
116                }
117            });
118
119            cx.foreground()
120                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
121                .detach_and_log_err(cx);
122
123            Some(LiveKitRoom {
124                room,
125                screen_track: ScreenTrack::None,
126                next_publish_id: 0,
127                _maintain_room,
128                _maintain_tracks,
129            })
130        } else {
131            None
132        };
133
134        Self {
135            id,
136            live_kit: live_kit_room,
137            status: RoomStatus::Online,
138            participant_user_ids: Default::default(),
139            local_participant: Default::default(),
140            remote_participants: Default::default(),
141            pending_participants: Default::default(),
142            pending_call_count: 0,
143            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
144            leave_when_empty: false,
145            pending_room_update: None,
146            client,
147            user_store,
148        }
149    }
150
151    pub(crate) fn create(
152        recipient_user_id: u64,
153        initial_project: Option<ModelHandle<Project>>,
154        client: Arc<Client>,
155        user_store: ModelHandle<UserStore>,
156        cx: &mut MutableAppContext,
157    ) -> Task<Result<ModelHandle<Self>>> {
158        cx.spawn(|mut cx| async move {
159            let response = client.request(proto::CreateRoom {}).await?;
160            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
161            let room = cx.add_model(|cx| {
162                Self::new(
163                    room_proto.id,
164                    response.live_kit_connection_info,
165                    client,
166                    user_store,
167                    cx,
168                )
169            });
170
171            let initial_project_id = if let Some(initial_project) = initial_project {
172                let initial_project_id = room
173                    .update(&mut cx, |room, cx| {
174                        room.share_project(initial_project.clone(), cx)
175                    })
176                    .await?;
177                Some(initial_project_id)
178            } else {
179                None
180            };
181
182            match room
183                .update(&mut cx, |room, cx| {
184                    room.leave_when_empty = true;
185                    room.call(recipient_user_id, initial_project_id, cx)
186                })
187                .await
188            {
189                Ok(()) => Ok(room),
190                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
191            }
192        })
193    }
194
195    pub(crate) fn join(
196        call: &IncomingCall,
197        client: Arc<Client>,
198        user_store: ModelHandle<UserStore>,
199        cx: &mut MutableAppContext,
200    ) -> Task<Result<ModelHandle<Self>>> {
201        let room_id = call.room_id;
202        cx.spawn(|mut cx| async move {
203            let response = client.request(proto::JoinRoom { id: room_id }).await?;
204            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
205            let room = cx.add_model(|cx| {
206                Self::new(
207                    room_id,
208                    response.live_kit_connection_info,
209                    client,
210                    user_store,
211                    cx,
212                )
213            });
214            room.update(&mut cx, |room, cx| {
215                room.leave_when_empty = true;
216                room.apply_room_update(room_proto, cx)?;
217                anyhow::Ok(())
218            })?;
219            Ok(room)
220        })
221    }
222
223    fn should_leave(&self) -> bool {
224        self.leave_when_empty
225            && self.pending_room_update.is_none()
226            && self.pending_participants.is_empty()
227            && self.remote_participants.is_empty()
228            && self.pending_call_count == 0
229    }
230
231    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
232        if self.status.is_offline() {
233            return Err(anyhow!("room is offline"));
234        }
235
236        cx.notify();
237        cx.emit(Event::Left);
238        self.status = RoomStatus::Offline;
239        self.remote_participants.clear();
240        self.pending_participants.clear();
241        self.participant_user_ids.clear();
242        self.subscriptions.clear();
243        self.live_kit.take();
244        self.client.send(proto::LeaveRoom { id: self.id })?;
245        Ok(())
246    }
247
248    pub fn id(&self) -> u64 {
249        self.id
250    }
251
252    pub fn status(&self) -> RoomStatus {
253        self.status
254    }
255
256    pub fn local_participant(&self) -> &LocalParticipant {
257        &self.local_participant
258    }
259
260    pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
261        &self.remote_participants
262    }
263
264    pub fn pending_participants(&self) -> &[Arc<User>] {
265        &self.pending_participants
266    }
267
268    pub fn contains_participant(&self, user_id: u64) -> bool {
269        self.participant_user_ids.contains(&user_id)
270    }
271
272    async fn handle_room_updated(
273        this: ModelHandle<Self>,
274        envelope: TypedEnvelope<proto::RoomUpdated>,
275        _: Arc<Client>,
276        mut cx: AsyncAppContext,
277    ) -> Result<()> {
278        let room = envelope
279            .payload
280            .room
281            .ok_or_else(|| anyhow!("invalid room"))?;
282        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
283    }
284
285    fn apply_room_update(
286        &mut self,
287        mut room: proto::Room,
288        cx: &mut ModelContext<Self>,
289    ) -> Result<()> {
290        // Filter ourselves out from the room's participants.
291        let local_participant_ix = room
292            .participants
293            .iter()
294            .position(|participant| Some(participant.user_id) == self.client.user_id());
295        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
296
297        let remote_participant_user_ids = room
298            .participants
299            .iter()
300            .map(|p| p.user_id)
301            .collect::<Vec<_>>();
302        let (remote_participants, pending_participants) =
303            self.user_store.update(cx, move |user_store, cx| {
304                (
305                    user_store.get_users(remote_participant_user_ids, cx),
306                    user_store.get_users(room.pending_participant_user_ids, cx),
307                )
308            });
309        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
310            let (remote_participants, pending_participants) =
311                futures::join!(remote_participants, pending_participants);
312
313            this.update(&mut cx, |this, cx| {
314                this.participant_user_ids.clear();
315
316                if let Some(participant) = local_participant {
317                    this.local_participant.projects = participant.projects;
318                } else {
319                    this.local_participant.projects.clear();
320                }
321
322                if let Some(participants) = remote_participants.log_err() {
323                    for (participant, user) in room.participants.into_iter().zip(participants) {
324                        let peer_id = PeerId(participant.peer_id);
325                        this.participant_user_ids.insert(participant.user_id);
326
327                        let old_projects = this
328                            .remote_participants
329                            .get(&peer_id)
330                            .into_iter()
331                            .flat_map(|existing| &existing.projects)
332                            .map(|project| project.id)
333                            .collect::<HashSet<_>>();
334                        let new_projects = participant
335                            .projects
336                            .iter()
337                            .map(|project| project.id)
338                            .collect::<HashSet<_>>();
339
340                        for project in &participant.projects {
341                            if !old_projects.contains(&project.id) {
342                                cx.emit(Event::RemoteProjectShared {
343                                    owner: user.clone(),
344                                    project_id: project.id,
345                                    worktree_root_names: project.worktree_root_names.clone(),
346                                });
347                            }
348                        }
349
350                        for unshared_project_id in old_projects.difference(&new_projects) {
351                            cx.emit(Event::RemoteProjectUnshared {
352                                project_id: *unshared_project_id,
353                            });
354                        }
355
356                        let location = ParticipantLocation::from_proto(participant.location)
357                            .unwrap_or(ParticipantLocation::External);
358                        if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id)
359                        {
360                            remote_participant.projects = participant.projects;
361                            if location != remote_participant.location {
362                                remote_participant.location = location;
363                                cx.emit(Event::ParticipantLocationChanged {
364                                    participant_id: peer_id,
365                                });
366                            }
367                        } else {
368                            this.remote_participants.insert(
369                                peer_id,
370                                RemoteParticipant {
371                                    user: user.clone(),
372                                    projects: participant.projects,
373                                    location,
374                                    tracks: Default::default(),
375                                },
376                            );
377
378                            if let Some(live_kit) = this.live_kit.as_ref() {
379                                let tracks =
380                                    live_kit.room.remote_video_tracks(&peer_id.0.to_string());
381                                for track in tracks {
382                                    this.remote_video_track_updated(
383                                        RemoteVideoTrackUpdate::Subscribed(track),
384                                        cx,
385                                    )
386                                    .log_err();
387                                }
388                            }
389                        }
390                    }
391
392                    this.remote_participants.retain(|_, participant| {
393                        if this.participant_user_ids.contains(&participant.user.id) {
394                            true
395                        } else {
396                            for project in &participant.projects {
397                                cx.emit(Event::RemoteProjectUnshared {
398                                    project_id: project.id,
399                                });
400                            }
401                            false
402                        }
403                    });
404                }
405
406                if let Some(pending_participants) = pending_participants.log_err() {
407                    this.pending_participants = pending_participants;
408                    for participant in &this.pending_participants {
409                        this.participant_user_ids.insert(participant.id);
410                    }
411                }
412
413                this.pending_room_update.take();
414                if this.should_leave() {
415                    let _ = this.leave(cx);
416                }
417
418                this.check_invariants();
419                cx.notify();
420            });
421        }));
422
423        cx.notify();
424        Ok(())
425    }
426
427    fn remote_video_track_updated(
428        &mut self,
429        change: RemoteVideoTrackUpdate,
430        cx: &mut ModelContext<Self>,
431    ) -> Result<()> {
432        match change {
433            RemoteVideoTrackUpdate::Subscribed(track) => {
434                let peer_id = PeerId(track.publisher_id().parse()?);
435                let track_id = track.sid().to_string();
436                let participant = self
437                    .remote_participants
438                    .get_mut(&peer_id)
439                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
440                participant.tracks.insert(
441                    track_id.clone(),
442                    Arc::new(RemoteVideoTrack {
443                        live_kit_track: track,
444                    }),
445                );
446                cx.emit(Event::RemoteVideoTracksChanged {
447                    participant_id: peer_id,
448                });
449            }
450            RemoteVideoTrackUpdate::Unsubscribed {
451                publisher_id,
452                track_id,
453            } => {
454                let peer_id = PeerId(publisher_id.parse()?);
455                let participant = self
456                    .remote_participants
457                    .get_mut(&peer_id)
458                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
459                participant.tracks.remove(&track_id);
460                cx.emit(Event::RemoteVideoTracksChanged {
461                    participant_id: peer_id,
462                });
463            }
464        }
465
466        cx.notify();
467        Ok(())
468    }
469
470    fn check_invariants(&self) {
471        #[cfg(any(test, feature = "test-support"))]
472        {
473            for participant in self.remote_participants.values() {
474                assert!(self.participant_user_ids.contains(&participant.user.id));
475            }
476
477            for participant in &self.pending_participants {
478                assert!(self.participant_user_ids.contains(&participant.id));
479            }
480
481            assert_eq!(
482                self.participant_user_ids.len(),
483                self.remote_participants.len() + self.pending_participants.len()
484            );
485        }
486    }
487
488    pub(crate) fn call(
489        &mut self,
490        recipient_user_id: u64,
491        initial_project_id: Option<u64>,
492        cx: &mut ModelContext<Self>,
493    ) -> Task<Result<()>> {
494        if self.status.is_offline() {
495            return Task::ready(Err(anyhow!("room is offline")));
496        }
497
498        cx.notify();
499        let client = self.client.clone();
500        let room_id = self.id;
501        self.pending_call_count += 1;
502        cx.spawn(|this, mut cx| async move {
503            let result = client
504                .request(proto::Call {
505                    room_id,
506                    recipient_user_id,
507                    initial_project_id,
508                })
509                .await;
510            this.update(&mut cx, |this, cx| {
511                this.pending_call_count -= 1;
512                if this.should_leave() {
513                    this.leave(cx)?;
514                }
515                result
516            })?;
517            Ok(())
518        })
519    }
520
521    pub(crate) fn share_project(
522        &mut self,
523        project: ModelHandle<Project>,
524        cx: &mut ModelContext<Self>,
525    ) -> Task<Result<u64>> {
526        if let Some(project_id) = project.read(cx).remote_id() {
527            return Task::ready(Ok(project_id));
528        }
529
530        let request = self.client.request(proto::ShareProject {
531            room_id: self.id(),
532            worktrees: project
533                .read(cx)
534                .worktrees(cx)
535                .map(|worktree| {
536                    let worktree = worktree.read(cx);
537                    proto::WorktreeMetadata {
538                        id: worktree.id().to_proto(),
539                        root_name: worktree.root_name().into(),
540                        visible: worktree.is_visible(),
541                        abs_path: worktree.abs_path().as_os_str().as_bytes().to_vec(),
542                    }
543                })
544                .collect(),
545        });
546        cx.spawn(|this, mut cx| async move {
547            let response = request.await?;
548
549            project.update(&mut cx, |project, cx| {
550                project
551                    .shared(response.project_id, cx)
552                    .detach_and_log_err(cx)
553            });
554
555            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
556            this.update(&mut cx, |this, cx| {
557                let active_project = this.local_participant.active_project.as_ref();
558                if active_project.map_or(false, |location| *location == project) {
559                    this.set_location(Some(&project), cx)
560                } else {
561                    Task::ready(Ok(()))
562                }
563            })
564            .await?;
565
566            Ok(response.project_id)
567        })
568    }
569
570    pub(crate) fn set_location(
571        &mut self,
572        project: Option<&ModelHandle<Project>>,
573        cx: &mut ModelContext<Self>,
574    ) -> Task<Result<()>> {
575        if self.status.is_offline() {
576            return Task::ready(Err(anyhow!("room is offline")));
577        }
578
579        let client = self.client.clone();
580        let room_id = self.id;
581        let location = if let Some(project) = project {
582            self.local_participant.active_project = Some(project.downgrade());
583            if let Some(project_id) = project.read(cx).remote_id() {
584                proto::participant_location::Variant::SharedProject(
585                    proto::participant_location::SharedProject { id: project_id },
586                )
587            } else {
588                proto::participant_location::Variant::UnsharedProject(
589                    proto::participant_location::UnsharedProject {},
590                )
591            }
592        } else {
593            self.local_participant.active_project = None;
594            proto::participant_location::Variant::External(proto::participant_location::External {})
595        };
596
597        cx.notify();
598        cx.foreground().spawn(async move {
599            client
600                .request(proto::UpdateParticipantLocation {
601                    room_id,
602                    location: Some(proto::ParticipantLocation {
603                        variant: Some(location),
604                    }),
605                })
606                .await?;
607            Ok(())
608        })
609    }
610
611    pub fn is_screen_sharing(&self) -> bool {
612        self.live_kit.as_ref().map_or(false, |live_kit| {
613            !matches!(live_kit.screen_track, ScreenTrack::None)
614        })
615    }
616
617    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
618        if self.status.is_offline() {
619            return Task::ready(Err(anyhow!("room is offline")));
620        } else if self.is_screen_sharing() {
621            return Task::ready(Err(anyhow!("screen was already shared")));
622        }
623
624        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
625            let publish_id = post_inc(&mut live_kit.next_publish_id);
626            live_kit.screen_track = ScreenTrack::Pending { publish_id };
627            cx.notify();
628            (live_kit.room.display_sources(), publish_id)
629        } else {
630            return Task::ready(Err(anyhow!("live-kit was not initialized")));
631        };
632
633        cx.spawn_weak(|this, mut cx| async move {
634            let publish_track = async {
635                let displays = displays.await?;
636                let display = displays
637                    .first()
638                    .ok_or_else(|| anyhow!("no display found"))?;
639                let track = LocalVideoTrack::screen_share_for_display(&display);
640                this.upgrade(&cx)
641                    .ok_or_else(|| anyhow!("room was dropped"))?
642                    .read_with(&cx, |this, _| {
643                        this.live_kit
644                            .as_ref()
645                            .map(|live_kit| live_kit.room.publish_video_track(&track))
646                    })
647                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
648                    .await
649            };
650
651            let publication = publish_track.await;
652            this.upgrade(&cx)
653                .ok_or_else(|| anyhow!("room was dropped"))?
654                .update(&mut cx, |this, cx| {
655                    let live_kit = this
656                        .live_kit
657                        .as_mut()
658                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
659
660                    let canceled = if let ScreenTrack::Pending {
661                        publish_id: cur_publish_id,
662                    } = &live_kit.screen_track
663                    {
664                        *cur_publish_id != publish_id
665                    } else {
666                        true
667                    };
668
669                    match publication {
670                        Ok(publication) => {
671                            if canceled {
672                                live_kit.room.unpublish_track(publication);
673                            } else {
674                                live_kit.screen_track = ScreenTrack::Published(publication);
675                                cx.notify();
676                            }
677                            Ok(())
678                        }
679                        Err(error) => {
680                            if canceled {
681                                Ok(())
682                            } else {
683                                live_kit.screen_track = ScreenTrack::None;
684                                cx.notify();
685                                Err(error)
686                            }
687                        }
688                    }
689                })
690        })
691    }
692
693    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
694        if self.status.is_offline() {
695            return Err(anyhow!("room is offline"));
696        }
697
698        let live_kit = self
699            .live_kit
700            .as_mut()
701            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
702        match mem::take(&mut live_kit.screen_track) {
703            ScreenTrack::None => Err(anyhow!("screen was not shared")),
704            ScreenTrack::Pending { .. } => {
705                cx.notify();
706                Ok(())
707            }
708            ScreenTrack::Published(track) => {
709                live_kit.room.unpublish_track(track);
710                cx.notify();
711                Ok(())
712            }
713        }
714    }
715
716    #[cfg(any(test, feature = "test-support"))]
717    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
718        self.live_kit
719            .as_ref()
720            .unwrap()
721            .room
722            .set_display_sources(sources);
723    }
724}
725
726struct LiveKitRoom {
727    room: Arc<live_kit_client::Room>,
728    screen_track: ScreenTrack,
729    next_publish_id: usize,
730    _maintain_room: Task<()>,
731    _maintain_tracks: Task<()>,
732}
733
734enum ScreenTrack {
735    None,
736    Pending { publish_id: usize },
737    Published(LocalTrackPublication),
738}
739
740impl Default for ScreenTrack {
741    fn default() -> Self {
742        Self::None
743    }
744}
745
746#[derive(Copy, Clone, PartialEq, Eq)]
747pub enum RoomStatus {
748    Online,
749    Offline,
750}
751
752impl RoomStatus {
753    pub fn is_offline(&self) -> bool {
754        matches!(self, RoomStatus::Offline)
755    }
756
757    pub fn is_online(&self) -> bool {
758        matches!(self, RoomStatus::Online)
759    }
760}