room.rs

  1use crate::{
  2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{BTreeMap, HashSet};
  8use futures::StreamExt;
  9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
 10use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate};
 11use postage::watch;
 12use project::Project;
 13use std::sync::Arc;
 14use util::ResultExt;
 15
 16#[derive(Clone, Debug, PartialEq, Eq)]
 17pub enum Event {
 18    Frame {
 19        participant_id: PeerId,
 20        track_id: live_kit_client::Sid,
 21    },
 22    RemoteProjectShared {
 23        owner: Arc<User>,
 24        project_id: u64,
 25        worktree_root_names: Vec<String>,
 26    },
 27    RemoteProjectUnshared {
 28        project_id: u64,
 29    },
 30    Left,
 31}
 32
 33pub struct Room {
 34    id: u64,
 35    live_kit_room: Option<(Arc<live_kit_client::Room>, Task<()>)>,
 36    status: RoomStatus,
 37    local_participant: LocalParticipant,
 38    remote_participants: BTreeMap<PeerId, RemoteParticipant>,
 39    pending_participants: Vec<Arc<User>>,
 40    participant_user_ids: HashSet<u64>,
 41    pending_call_count: usize,
 42    leave_when_empty: bool,
 43    client: Arc<Client>,
 44    user_store: ModelHandle<UserStore>,
 45    subscriptions: Vec<client::Subscription>,
 46    pending_room_update: Option<Task<()>>,
 47}
 48
 49impl Entity for Room {
 50    type Event = Event;
 51
 52    fn release(&mut self, _: &mut MutableAppContext) {
 53        self.client.send(proto::LeaveRoom { id: self.id }).log_err();
 54    }
 55}
 56
 57impl Room {
 58    fn new(
 59        id: u64,
 60        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 61        client: Arc<Client>,
 62        user_store: ModelHandle<UserStore>,
 63        cx: &mut ModelContext<Self>,
 64    ) -> Self {
 65        let mut client_status = client.status();
 66        cx.spawn_weak(|this, mut cx| async move {
 67            let is_connected = client_status
 68                .next()
 69                .await
 70                .map_or(false, |s| s.is_connected());
 71            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 72            if !is_connected || client_status.next().await.is_some() {
 73                if let Some(this) = this.upgrade(&cx) {
 74                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 75                }
 76            }
 77        })
 78        .detach();
 79
 80        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 81            let room = live_kit_client::Room::new();
 82            let mut track_changes = room.remote_video_track_updates();
 83            let maintain_room = cx.spawn_weak(|this, mut cx| async move {
 84                while let Some(track_change) = track_changes.next().await {
 85                    let this = if let Some(this) = this.upgrade(&cx) {
 86                        this
 87                    } else {
 88                        break;
 89                    };
 90
 91                    this.update(&mut cx, |this, cx| {
 92                        this.remote_video_track_updated(track_change, cx).log_err()
 93                    });
 94                }
 95            });
 96            cx.foreground()
 97                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
 98                .detach_and_log_err(cx);
 99            Some((room, maintain_room))
100        } else {
101            None
102        };
103
104        Self {
105            id,
106            live_kit_room,
107            status: RoomStatus::Online,
108            participant_user_ids: Default::default(),
109            local_participant: Default::default(),
110            remote_participants: Default::default(),
111            pending_participants: Default::default(),
112            pending_call_count: 0,
113            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
114            leave_when_empty: false,
115            pending_room_update: None,
116            client,
117            user_store,
118        }
119    }
120
121    pub(crate) fn create(
122        recipient_user_id: u64,
123        initial_project: Option<ModelHandle<Project>>,
124        client: Arc<Client>,
125        user_store: ModelHandle<UserStore>,
126        cx: &mut MutableAppContext,
127    ) -> Task<Result<ModelHandle<Self>>> {
128        cx.spawn(|mut cx| async move {
129            let response = client.request(proto::CreateRoom {}).await?;
130            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
131            let room = cx.add_model(|cx| {
132                Self::new(
133                    room_proto.id,
134                    response.live_kit_connection_info,
135                    client,
136                    user_store,
137                    cx,
138                )
139            });
140
141            let initial_project_id = if let Some(initial_project) = initial_project {
142                let initial_project_id = room
143                    .update(&mut cx, |room, cx| {
144                        room.share_project(initial_project.clone(), cx)
145                    })
146                    .await?;
147                Some(initial_project_id)
148            } else {
149                None
150            };
151
152            match room
153                .update(&mut cx, |room, cx| {
154                    room.leave_when_empty = true;
155                    room.call(recipient_user_id, initial_project_id, cx)
156                })
157                .await
158            {
159                Ok(()) => Ok(room),
160                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
161            }
162        })
163    }
164
165    pub(crate) fn join(
166        call: &IncomingCall,
167        client: Arc<Client>,
168        user_store: ModelHandle<UserStore>,
169        cx: &mut MutableAppContext,
170    ) -> Task<Result<ModelHandle<Self>>> {
171        let room_id = call.room_id;
172        cx.spawn(|mut cx| async move {
173            let response = client.request(proto::JoinRoom { id: room_id }).await?;
174            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
175            let room = cx.add_model(|cx| {
176                Self::new(
177                    room_id,
178                    response.live_kit_connection_info,
179                    client,
180                    user_store,
181                    cx,
182                )
183            });
184            room.update(&mut cx, |room, cx| {
185                room.leave_when_empty = true;
186                room.apply_room_update(room_proto, cx)?;
187                anyhow::Ok(())
188            })?;
189            Ok(room)
190        })
191    }
192
193    fn should_leave(&self) -> bool {
194        self.leave_when_empty
195            && self.pending_room_update.is_none()
196            && self.pending_participants.is_empty()
197            && self.remote_participants.is_empty()
198            && self.pending_call_count == 0
199    }
200
201    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
202        if self.status.is_offline() {
203            return Err(anyhow!("room is offline"));
204        }
205
206        cx.notify();
207        cx.emit(Event::Left);
208        self.status = RoomStatus::Offline;
209        self.remote_participants.clear();
210        self.pending_participants.clear();
211        self.participant_user_ids.clear();
212        self.subscriptions.clear();
213        self.client.send(proto::LeaveRoom { id: self.id })?;
214        Ok(())
215    }
216
217    pub fn id(&self) -> u64 {
218        self.id
219    }
220
221    pub fn status(&self) -> RoomStatus {
222        self.status
223    }
224
225    pub fn local_participant(&self) -> &LocalParticipant {
226        &self.local_participant
227    }
228
229    pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
230        &self.remote_participants
231    }
232
233    pub fn pending_participants(&self) -> &[Arc<User>] {
234        &self.pending_participants
235    }
236
237    pub fn contains_participant(&self, user_id: u64) -> bool {
238        self.participant_user_ids.contains(&user_id)
239    }
240
241    async fn handle_room_updated(
242        this: ModelHandle<Self>,
243        envelope: TypedEnvelope<proto::RoomUpdated>,
244        _: Arc<Client>,
245        mut cx: AsyncAppContext,
246    ) -> Result<()> {
247        let room = envelope
248            .payload
249            .room
250            .ok_or_else(|| anyhow!("invalid room"))?;
251        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
252    }
253
254    fn apply_room_update(
255        &mut self,
256        mut room: proto::Room,
257        cx: &mut ModelContext<Self>,
258    ) -> Result<()> {
259        // Filter ourselves out from the room's participants.
260        let local_participant_ix = room
261            .participants
262            .iter()
263            .position(|participant| Some(participant.user_id) == self.client.user_id());
264        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
265
266        let remote_participant_user_ids = room
267            .participants
268            .iter()
269            .map(|p| p.user_id)
270            .collect::<Vec<_>>();
271        let (remote_participants, pending_participants) =
272            self.user_store.update(cx, move |user_store, cx| {
273                (
274                    user_store.get_users(remote_participant_user_ids, cx),
275                    user_store.get_users(room.pending_participant_user_ids, cx),
276                )
277            });
278        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
279            let (remote_participants, pending_participants) =
280                futures::join!(remote_participants, pending_participants);
281
282            this.update(&mut cx, |this, cx| {
283                this.participant_user_ids.clear();
284
285                if let Some(participant) = local_participant {
286                    this.local_participant.projects = participant.projects;
287                } else {
288                    this.local_participant.projects.clear();
289                }
290
291                if let Some(participants) = remote_participants.log_err() {
292                    for (participant, user) in room.participants.into_iter().zip(participants) {
293                        let peer_id = PeerId(participant.peer_id);
294                        this.participant_user_ids.insert(participant.user_id);
295
296                        let old_projects = this
297                            .remote_participants
298                            .get(&peer_id)
299                            .into_iter()
300                            .flat_map(|existing| &existing.projects)
301                            .map(|project| project.id)
302                            .collect::<HashSet<_>>();
303                        let new_projects = participant
304                            .projects
305                            .iter()
306                            .map(|project| project.id)
307                            .collect::<HashSet<_>>();
308
309                        for project in &participant.projects {
310                            if !old_projects.contains(&project.id) {
311                                cx.emit(Event::RemoteProjectShared {
312                                    owner: user.clone(),
313                                    project_id: project.id,
314                                    worktree_root_names: project.worktree_root_names.clone(),
315                                });
316                            }
317                        }
318
319                        for unshared_project_id in old_projects.difference(&new_projects) {
320                            cx.emit(Event::RemoteProjectUnshared {
321                                project_id: *unshared_project_id,
322                            });
323                        }
324
325                        let location = ParticipantLocation::from_proto(participant.location)
326                            .unwrap_or(ParticipantLocation::External);
327                        if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id)
328                        {
329                            remote_participant.projects = participant.projects;
330                            remote_participant.location = location;
331                        } else {
332                            this.remote_participants.insert(
333                                peer_id,
334                                RemoteParticipant {
335                                    user: user.clone(),
336                                    projects: participant.projects,
337                                    location,
338                                    tracks: Default::default(),
339                                },
340                            );
341
342                            if let Some((room, _)) = this.live_kit_room.as_ref() {
343                                let tracks = room.remote_video_tracks(&peer_id.0.to_string());
344                                for track in tracks {
345                                    this.remote_video_track_updated(
346                                        RemoteVideoTrackUpdate::Subscribed(track),
347                                        cx,
348                                    )
349                                    .log_err();
350                                }
351                            }
352                        }
353                    }
354
355                    this.remote_participants.retain(|_, participant| {
356                        if this.participant_user_ids.contains(&participant.user.id) {
357                            true
358                        } else {
359                            for project in &participant.projects {
360                                cx.emit(Event::RemoteProjectUnshared {
361                                    project_id: project.id,
362                                });
363                            }
364                            false
365                        }
366                    });
367                }
368
369                if let Some(pending_participants) = pending_participants.log_err() {
370                    this.pending_participants = pending_participants;
371                    for participant in &this.pending_participants {
372                        this.participant_user_ids.insert(participant.id);
373                    }
374                }
375
376                this.pending_room_update.take();
377                if this.should_leave() {
378                    let _ = this.leave(cx);
379                }
380
381                this.check_invariants();
382                cx.notify();
383            });
384        }));
385
386        cx.notify();
387        Ok(())
388    }
389
390    fn remote_video_track_updated(
391        &mut self,
392        change: RemoteVideoTrackUpdate,
393        cx: &mut ModelContext<Self>,
394    ) -> Result<()> {
395        match change {
396            RemoteVideoTrackUpdate::Subscribed(track) => {
397                let peer_id = PeerId(track.publisher_id().parse()?);
398                let track_id = track.sid().to_string();
399                let participant = self
400                    .remote_participants
401                    .get_mut(&peer_id)
402                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
403                let (mut tx, mut rx) = watch::channel();
404                track.add_renderer(move |frame| *tx.borrow_mut() = Some(frame));
405                participant.tracks.insert(
406                    track_id.clone(),
407                    RemoteVideoTrack {
408                        frame: None,
409                        _live_kit_track: track,
410                        _maintain_frame: Arc::new(cx.spawn_weak(|this, mut cx| async move {
411                            while let Some(frame) = rx.next().await {
412                                let this = if let Some(this) = this.upgrade(&cx) {
413                                    this
414                                } else {
415                                    break;
416                                };
417
418                                let done = this.update(&mut cx, |this, cx| {
419                                    if let Some(track) =
420                                        this.remote_participants.get_mut(&peer_id).and_then(
421                                            |participant| participant.tracks.get_mut(&track_id),
422                                        )
423                                    {
424                                        track.frame = frame;
425                                        cx.emit(Event::Frame {
426                                            participant_id: peer_id,
427                                            track_id: track_id.clone(),
428                                        });
429                                        false
430                                    } else {
431                                        true
432                                    }
433                                });
434
435                                if done {
436                                    break;
437                                }
438                            }
439                        })),
440                    },
441                );
442            }
443            RemoteVideoTrackUpdate::Unsubscribed {
444                publisher_id,
445                track_id,
446            } => {
447                let peer_id = PeerId(publisher_id.parse()?);
448                let participant = self
449                    .remote_participants
450                    .get_mut(&peer_id)
451                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
452                participant.tracks.remove(&track_id);
453            }
454        }
455
456        cx.notify();
457        Ok(())
458    }
459
460    fn check_invariants(&self) {
461        #[cfg(any(test, feature = "test-support"))]
462        {
463            for participant in self.remote_participants.values() {
464                assert!(self.participant_user_ids.contains(&participant.user.id));
465            }
466
467            for participant in &self.pending_participants {
468                assert!(self.participant_user_ids.contains(&participant.id));
469            }
470
471            assert_eq!(
472                self.participant_user_ids.len(),
473                self.remote_participants.len() + self.pending_participants.len()
474            );
475        }
476    }
477
478    pub(crate) fn call(
479        &mut self,
480        recipient_user_id: u64,
481        initial_project_id: Option<u64>,
482        cx: &mut ModelContext<Self>,
483    ) -> Task<Result<()>> {
484        if self.status.is_offline() {
485            return Task::ready(Err(anyhow!("room is offline")));
486        }
487
488        cx.notify();
489        let client = self.client.clone();
490        let room_id = self.id;
491        self.pending_call_count += 1;
492        cx.spawn(|this, mut cx| async move {
493            let result = client
494                .request(proto::Call {
495                    room_id,
496                    recipient_user_id,
497                    initial_project_id,
498                })
499                .await;
500            this.update(&mut cx, |this, cx| {
501                this.pending_call_count -= 1;
502                if this.should_leave() {
503                    this.leave(cx)?;
504                }
505                result
506            })?;
507            Ok(())
508        })
509    }
510
511    pub(crate) fn share_project(
512        &mut self,
513        project: ModelHandle<Project>,
514        cx: &mut ModelContext<Self>,
515    ) -> Task<Result<u64>> {
516        if let Some(project_id) = project.read(cx).remote_id() {
517            return Task::ready(Ok(project_id));
518        }
519
520        let request = self.client.request(proto::ShareProject {
521            room_id: self.id(),
522            worktrees: project
523                .read(cx)
524                .worktrees(cx)
525                .map(|worktree| {
526                    let worktree = worktree.read(cx);
527                    proto::WorktreeMetadata {
528                        id: worktree.id().to_proto(),
529                        root_name: worktree.root_name().into(),
530                        visible: worktree.is_visible(),
531                    }
532                })
533                .collect(),
534        });
535        cx.spawn(|this, mut cx| async move {
536            let response = request.await?;
537
538            project.update(&mut cx, |project, cx| {
539                project
540                    .shared(response.project_id, cx)
541                    .detach_and_log_err(cx)
542            });
543
544            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
545            this.update(&mut cx, |this, cx| {
546                let active_project = this.local_participant.active_project.as_ref();
547                if active_project.map_or(false, |location| *location == project) {
548                    this.set_location(Some(&project), cx)
549                } else {
550                    Task::ready(Ok(()))
551                }
552            })
553            .await?;
554
555            Ok(response.project_id)
556        })
557    }
558
559    pub fn set_location(
560        &mut self,
561        project: Option<&ModelHandle<Project>>,
562        cx: &mut ModelContext<Self>,
563    ) -> Task<Result<()>> {
564        if self.status.is_offline() {
565            return Task::ready(Err(anyhow!("room is offline")));
566        }
567
568        let client = self.client.clone();
569        let room_id = self.id;
570        let location = if let Some(project) = project {
571            self.local_participant.active_project = Some(project.downgrade());
572            if let Some(project_id) = project.read(cx).remote_id() {
573                proto::participant_location::Variant::SharedProject(
574                    proto::participant_location::SharedProject { id: project_id },
575                )
576            } else {
577                proto::participant_location::Variant::UnsharedProject(
578                    proto::participant_location::UnsharedProject {},
579                )
580            }
581        } else {
582            self.local_participant.active_project = None;
583            proto::participant_location::Variant::External(proto::participant_location::External {})
584        };
585
586        cx.notify();
587        cx.foreground().spawn(async move {
588            client
589                .request(proto::UpdateParticipantLocation {
590                    room_id,
591                    location: Some(proto::ParticipantLocation {
592                        variant: Some(location),
593                    }),
594                })
595                .await?;
596            Ok(())
597        })
598    }
599
600    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
601        if self.status.is_offline() {
602            return Task::ready(Err(anyhow!("room is offline")));
603        }
604
605        let room = if let Some((room, _)) = self.live_kit_room.as_ref() {
606            room.clone()
607        } else {
608            return Task::ready(Err(anyhow!("not connected to LiveKit")));
609        };
610
611        cx.foreground().spawn(async move {
612            let displays = live_kit_client::display_sources().await?;
613            let display = displays
614                .first()
615                .ok_or_else(|| anyhow!("no display found"))?;
616            let track = LocalVideoTrack::screen_share_for_display(&display);
617            room.publish_video_track(&track).await?;
618            Ok(())
619        })
620    }
621}
622
623#[derive(Copy, Clone, PartialEq, Eq)]
624pub enum RoomStatus {
625    Online,
626    Offline,
627}
628
629impl RoomStatus {
630    pub fn is_offline(&self) -> bool {
631        matches!(self, RoomStatus::Offline)
632    }
633}