room.rs

  1use crate::{
  2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
  3    IncomingCall,
  4};
  5use anyhow::{anyhow, Result};
  6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
  7use collections::{BTreeMap, HashSet};
  8use futures::StreamExt;
  9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
 10use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate};
 11use postage::watch;
 12use project::Project;
 13use std::sync::Arc;
 14use util::ResultExt;
 15
 16#[derive(Clone, Debug, PartialEq, Eq)]
 17pub enum Event {
 18    RemoteProjectShared {
 19        owner: Arc<User>,
 20        project_id: u64,
 21        worktree_root_names: Vec<String>,
 22    },
 23    RemoteProjectUnshared {
 24        project_id: u64,
 25    },
 26    Left,
 27}
 28
 29pub struct Room {
 30    id: u64,
 31    live_kit_room: Option<(Arc<live_kit_client::Room>, Task<()>)>,
 32    status: RoomStatus,
 33    local_participant: LocalParticipant,
 34    remote_participants: BTreeMap<PeerId, RemoteParticipant>,
 35    pending_participants: Vec<Arc<User>>,
 36    participant_user_ids: HashSet<u64>,
 37    pending_call_count: usize,
 38    leave_when_empty: bool,
 39    client: Arc<Client>,
 40    user_store: ModelHandle<UserStore>,
 41    subscriptions: Vec<client::Subscription>,
 42    pending_room_update: Option<Task<()>>,
 43}
 44
 45impl Entity for Room {
 46    type Event = Event;
 47
 48    fn release(&mut self, _: &mut MutableAppContext) {
 49        self.client.send(proto::LeaveRoom { id: self.id }).log_err();
 50    }
 51}
 52
 53impl Room {
 54    fn new(
 55        id: u64,
 56        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 57        client: Arc<Client>,
 58        user_store: ModelHandle<UserStore>,
 59        cx: &mut ModelContext<Self>,
 60    ) -> Self {
 61        let mut client_status = client.status();
 62        cx.spawn_weak(|this, mut cx| async move {
 63            let is_connected = client_status
 64                .next()
 65                .await
 66                .map_or(false, |s| s.is_connected());
 67            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 68            if !is_connected || client_status.next().await.is_some() {
 69                if let Some(this) = this.upgrade(&cx) {
 70                    let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 71                }
 72            }
 73        })
 74        .detach();
 75
 76        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 77            let room = live_kit_client::Room::new();
 78            let mut track_changes = room.remote_video_track_updates();
 79            let maintain_room = cx.spawn_weak(|this, mut cx| async move {
 80                while let Some(track_change) = track_changes.next().await {
 81                    let this = if let Some(this) = this.upgrade(&cx) {
 82                        this
 83                    } else {
 84                        break;
 85                    };
 86
 87                    this.update(&mut cx, |this, cx| {
 88                        this.remote_video_track_updated(track_change, cx).log_err()
 89                    });
 90                }
 91            });
 92            cx.foreground()
 93                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
 94                .detach_and_log_err(cx);
 95            Some((room, maintain_room))
 96        } else {
 97            None
 98        };
 99
100        Self {
101            id,
102            live_kit_room,
103            status: RoomStatus::Online,
104            participant_user_ids: Default::default(),
105            local_participant: Default::default(),
106            remote_participants: Default::default(),
107            pending_participants: Default::default(),
108            pending_call_count: 0,
109            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
110            leave_when_empty: false,
111            pending_room_update: None,
112            client,
113            user_store,
114        }
115    }
116
117    pub(crate) fn create(
118        recipient_user_id: u64,
119        initial_project: Option<ModelHandle<Project>>,
120        client: Arc<Client>,
121        user_store: ModelHandle<UserStore>,
122        cx: &mut MutableAppContext,
123    ) -> Task<Result<ModelHandle<Self>>> {
124        cx.spawn(|mut cx| async move {
125            let response = client.request(proto::CreateRoom {}).await?;
126            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
127            let room = cx.add_model(|cx| {
128                Self::new(
129                    room_proto.id,
130                    response.live_kit_connection_info,
131                    client,
132                    user_store,
133                    cx,
134                )
135            });
136
137            let initial_project_id = if let Some(initial_project) = initial_project {
138                let initial_project_id = room
139                    .update(&mut cx, |room, cx| {
140                        room.share_project(initial_project.clone(), cx)
141                    })
142                    .await?;
143                Some(initial_project_id)
144            } else {
145                None
146            };
147
148            match room
149                .update(&mut cx, |room, cx| {
150                    room.leave_when_empty = true;
151                    room.call(recipient_user_id, initial_project_id, cx)
152                })
153                .await
154            {
155                Ok(()) => Ok(room),
156                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
157            }
158        })
159    }
160
161    pub(crate) fn join(
162        call: &IncomingCall,
163        client: Arc<Client>,
164        user_store: ModelHandle<UserStore>,
165        cx: &mut MutableAppContext,
166    ) -> Task<Result<ModelHandle<Self>>> {
167        let room_id = call.room_id;
168        cx.spawn(|mut cx| async move {
169            let response = client.request(proto::JoinRoom { id: room_id }).await?;
170            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
171            let room = cx.add_model(|cx| {
172                Self::new(
173                    room_id,
174                    response.live_kit_connection_info,
175                    client,
176                    user_store,
177                    cx,
178                )
179            });
180            room.update(&mut cx, |room, cx| {
181                room.leave_when_empty = true;
182                room.apply_room_update(room_proto, cx)?;
183                anyhow::Ok(())
184            })?;
185            Ok(room)
186        })
187    }
188
189    fn should_leave(&self) -> bool {
190        self.leave_when_empty
191            && self.pending_room_update.is_none()
192            && self.pending_participants.is_empty()
193            && self.remote_participants.is_empty()
194            && self.pending_call_count == 0
195    }
196
197    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
198        if self.status.is_offline() {
199            return Err(anyhow!("room is offline"));
200        }
201
202        cx.notify();
203        cx.emit(Event::Left);
204        self.status = RoomStatus::Offline;
205        self.remote_participants.clear();
206        self.pending_participants.clear();
207        self.participant_user_ids.clear();
208        self.subscriptions.clear();
209        self.client.send(proto::LeaveRoom { id: self.id })?;
210        Ok(())
211    }
212
213    pub fn id(&self) -> u64 {
214        self.id
215    }
216
217    pub fn status(&self) -> RoomStatus {
218        self.status
219    }
220
221    pub fn local_participant(&self) -> &LocalParticipant {
222        &self.local_participant
223    }
224
225    pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
226        &self.remote_participants
227    }
228
229    pub fn pending_participants(&self) -> &[Arc<User>] {
230        &self.pending_participants
231    }
232
233    pub fn contains_participant(&self, user_id: u64) -> bool {
234        self.participant_user_ids.contains(&user_id)
235    }
236
237    async fn handle_room_updated(
238        this: ModelHandle<Self>,
239        envelope: TypedEnvelope<proto::RoomUpdated>,
240        _: Arc<Client>,
241        mut cx: AsyncAppContext,
242    ) -> Result<()> {
243        let room = envelope
244            .payload
245            .room
246            .ok_or_else(|| anyhow!("invalid room"))?;
247        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
248    }
249
250    fn apply_room_update(
251        &mut self,
252        mut room: proto::Room,
253        cx: &mut ModelContext<Self>,
254    ) -> Result<()> {
255        // Filter ourselves out from the room's participants.
256        let local_participant_ix = room
257            .participants
258            .iter()
259            .position(|participant| Some(participant.user_id) == self.client.user_id());
260        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
261
262        let remote_participant_user_ids = room
263            .participants
264            .iter()
265            .map(|p| p.user_id)
266            .collect::<Vec<_>>();
267        let (remote_participants, pending_participants) =
268            self.user_store.update(cx, move |user_store, cx| {
269                (
270                    user_store.get_users(remote_participant_user_ids, cx),
271                    user_store.get_users(room.pending_participant_user_ids, cx),
272                )
273            });
274        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
275            let (remote_participants, pending_participants) =
276                futures::join!(remote_participants, pending_participants);
277
278            this.update(&mut cx, |this, cx| {
279                this.participant_user_ids.clear();
280
281                if let Some(participant) = local_participant {
282                    this.local_participant.projects = participant.projects;
283                } else {
284                    this.local_participant.projects.clear();
285                }
286
287                if let Some(participants) = remote_participants.log_err() {
288                    for (participant, user) in room.participants.into_iter().zip(participants) {
289                        let peer_id = PeerId(participant.peer_id);
290                        this.participant_user_ids.insert(participant.user_id);
291
292                        let old_projects = this
293                            .remote_participants
294                            .get(&peer_id)
295                            .into_iter()
296                            .flat_map(|existing| &existing.projects)
297                            .map(|project| project.id)
298                            .collect::<HashSet<_>>();
299                        let new_projects = participant
300                            .projects
301                            .iter()
302                            .map(|project| project.id)
303                            .collect::<HashSet<_>>();
304
305                        for project in &participant.projects {
306                            if !old_projects.contains(&project.id) {
307                                cx.emit(Event::RemoteProjectShared {
308                                    owner: user.clone(),
309                                    project_id: project.id,
310                                    worktree_root_names: project.worktree_root_names.clone(),
311                                });
312                            }
313                        }
314
315                        for unshared_project_id in old_projects.difference(&new_projects) {
316                            cx.emit(Event::RemoteProjectUnshared {
317                                project_id: *unshared_project_id,
318                            });
319                        }
320
321                        this.remote_participants.insert(
322                            peer_id,
323                            RemoteParticipant {
324                                user: user.clone(),
325                                projects: participant.projects,
326                                location: ParticipantLocation::from_proto(participant.location)
327                                    .unwrap_or(ParticipantLocation::External),
328                                tracks: Default::default(),
329                            },
330                        );
331
332                        if let Some((room, _)) = this.live_kit_room.as_ref() {
333                            println!("getting video tracks for peer id {}", peer_id.0.to_string());
334                            let tracks = room.remote_video_tracks(&peer_id.0.to_string());
335                            dbg!(tracks.len());
336                            for track in tracks {
337                                dbg!(track.sid(), track.publisher_id());
338                                this.remote_video_track_updated(
339                                    RemoteVideoTrackUpdate::Subscribed(track),
340                                    cx,
341                                )
342                                .log_err();
343                            }
344                        }
345                    }
346
347                    this.remote_participants.retain(|_, participant| {
348                        if this.participant_user_ids.contains(&participant.user.id) {
349                            true
350                        } else {
351                            for project in &participant.projects {
352                                cx.emit(Event::RemoteProjectUnshared {
353                                    project_id: project.id,
354                                });
355                            }
356                            false
357                        }
358                    });
359                }
360
361                if let Some(pending_participants) = pending_participants.log_err() {
362                    this.pending_participants = pending_participants;
363                    for participant in &this.pending_participants {
364                        this.participant_user_ids.insert(participant.id);
365                    }
366                }
367
368                this.pending_room_update.take();
369                if this.should_leave() {
370                    let _ = this.leave(cx);
371                }
372
373                this.check_invariants();
374                cx.notify();
375            });
376        }));
377
378        cx.notify();
379        Ok(())
380    }
381
382    fn remote_video_track_updated(
383        &mut self,
384        change: RemoteVideoTrackUpdate,
385        cx: &mut ModelContext<Self>,
386    ) -> Result<()> {
387        match change {
388            RemoteVideoTrackUpdate::Subscribed(track) => {
389                dbg!(track.publisher_id(), track.sid());
390                let peer_id = PeerId(track.publisher_id().parse()?);
391                let track_id = track.sid().to_string();
392                let participant = self
393                    .remote_participants
394                    .get_mut(&peer_id)
395                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
396                let (mut tx, mut rx) = watch::channel();
397                track.add_renderer(move |frame| *tx.borrow_mut() = Some(frame));
398                participant.tracks.insert(
399                    track_id.clone(),
400                    RemoteVideoTrack {
401                        frame: None,
402                        _live_kit_track: track,
403                        _maintain_frame: Arc::new(cx.spawn_weak(|this, mut cx| async move {
404                            while let Some(frame) = rx.next().await {
405                                let this = if let Some(this) = this.upgrade(&cx) {
406                                    this
407                                } else {
408                                    break;
409                                };
410
411                                let done = this.update(&mut cx, |this, cx| {
412                                    // TODO: replace this with an emit.
413                                    cx.notify();
414                                    if let Some(track) =
415                                        this.remote_participants.get_mut(&peer_id).and_then(
416                                            |participant| participant.tracks.get_mut(&track_id),
417                                        )
418                                    {
419                                        track.frame = frame;
420                                        false
421                                    } else {
422                                        true
423                                    }
424                                });
425
426                                if done {
427                                    break;
428                                }
429                            }
430                        })),
431                    },
432                );
433            }
434            RemoteVideoTrackUpdate::Unsubscribed {
435                publisher_id,
436                track_id,
437            } => {
438                let peer_id = PeerId(publisher_id.parse()?);
439                let participant = self
440                    .remote_participants
441                    .get_mut(&peer_id)
442                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
443                participant.tracks.remove(&track_id);
444            }
445        }
446
447        cx.notify();
448        Ok(())
449    }
450
451    fn check_invariants(&self) {
452        #[cfg(any(test, feature = "test-support"))]
453        {
454            for participant in self.remote_participants.values() {
455                assert!(self.participant_user_ids.contains(&participant.user.id));
456            }
457
458            for participant in &self.pending_participants {
459                assert!(self.participant_user_ids.contains(&participant.id));
460            }
461
462            assert_eq!(
463                self.participant_user_ids.len(),
464                self.remote_participants.len() + self.pending_participants.len()
465            );
466        }
467    }
468
469    pub(crate) fn call(
470        &mut self,
471        recipient_user_id: u64,
472        initial_project_id: Option<u64>,
473        cx: &mut ModelContext<Self>,
474    ) -> Task<Result<()>> {
475        if self.status.is_offline() {
476            return Task::ready(Err(anyhow!("room is offline")));
477        }
478
479        cx.notify();
480        let client = self.client.clone();
481        let room_id = self.id;
482        self.pending_call_count += 1;
483        cx.spawn(|this, mut cx| async move {
484            let result = client
485                .request(proto::Call {
486                    room_id,
487                    recipient_user_id,
488                    initial_project_id,
489                })
490                .await;
491            this.update(&mut cx, |this, cx| {
492                this.pending_call_count -= 1;
493                if this.should_leave() {
494                    this.leave(cx)?;
495                }
496                result
497            })?;
498            Ok(())
499        })
500    }
501
502    pub(crate) fn share_project(
503        &mut self,
504        project: ModelHandle<Project>,
505        cx: &mut ModelContext<Self>,
506    ) -> Task<Result<u64>> {
507        if let Some(project_id) = project.read(cx).remote_id() {
508            return Task::ready(Ok(project_id));
509        }
510
511        let request = self.client.request(proto::ShareProject {
512            room_id: self.id(),
513            worktrees: project
514                .read(cx)
515                .worktrees(cx)
516                .map(|worktree| {
517                    let worktree = worktree.read(cx);
518                    proto::WorktreeMetadata {
519                        id: worktree.id().to_proto(),
520                        root_name: worktree.root_name().into(),
521                        visible: worktree.is_visible(),
522                    }
523                })
524                .collect(),
525        });
526        cx.spawn(|this, mut cx| async move {
527            let response = request.await?;
528
529            project.update(&mut cx, |project, cx| {
530                project
531                    .shared(response.project_id, cx)
532                    .detach_and_log_err(cx)
533            });
534
535            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
536            this.update(&mut cx, |this, cx| {
537                let active_project = this.local_participant.active_project.as_ref();
538                if active_project.map_or(false, |location| *location == project) {
539                    this.set_location(Some(&project), cx)
540                } else {
541                    Task::ready(Ok(()))
542                }
543            })
544            .await?;
545
546            Ok(response.project_id)
547        })
548    }
549
550    pub fn set_location(
551        &mut self,
552        project: Option<&ModelHandle<Project>>,
553        cx: &mut ModelContext<Self>,
554    ) -> Task<Result<()>> {
555        if self.status.is_offline() {
556            return Task::ready(Err(anyhow!("room is offline")));
557        }
558
559        let client = self.client.clone();
560        let room_id = self.id;
561        let location = if let Some(project) = project {
562            self.local_participant.active_project = Some(project.downgrade());
563            if let Some(project_id) = project.read(cx).remote_id() {
564                proto::participant_location::Variant::SharedProject(
565                    proto::participant_location::SharedProject { id: project_id },
566                )
567            } else {
568                proto::participant_location::Variant::UnsharedProject(
569                    proto::participant_location::UnsharedProject {},
570                )
571            }
572        } else {
573            self.local_participant.active_project = None;
574            proto::participant_location::Variant::External(proto::participant_location::External {})
575        };
576
577        cx.notify();
578        cx.foreground().spawn(async move {
579            client
580                .request(proto::UpdateParticipantLocation {
581                    room_id,
582                    location: Some(proto::ParticipantLocation {
583                        variant: Some(location),
584                    }),
585                })
586                .await?;
587            Ok(())
588        })
589    }
590
591    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
592        if self.status.is_offline() {
593            return Task::ready(Err(anyhow!("room is offline")));
594        }
595
596        let room = if let Some((room, _)) = self.live_kit_room.as_ref() {
597            room.clone()
598        } else {
599            return Task::ready(Err(anyhow!("not connected to LiveKit")));
600        };
601
602        cx.foreground().spawn(async move {
603            let displays = live_kit_client::display_sources().await?;
604            let display = displays
605                .first()
606                .ok_or_else(|| anyhow!("no display found"))?;
607            let track = LocalVideoTrack::screen_share_for_display(&display);
608            room.publish_video_track(&track).await?;
609            Ok(())
610        })
611    }
612}
613
614#[derive(Copy, Clone, PartialEq, Eq)]
615pub enum RoomStatus {
616    Online,
617    Offline,
618}
619
620impl RoomStatus {
621    pub fn is_offline(&self) -> bool {
622        matches!(self, RoomStatus::Offline)
623    }
624}