1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
7use collections::{BTreeMap, HashSet};
8use futures::StreamExt;
9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
10use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
11use postage::stream::Stream;
12use project::Project;
13use std::{mem, sync::Arc};
14use util::{post_inc, ResultExt};
15
16#[derive(Clone, Debug, PartialEq, Eq)]
17pub enum Event {
18 ParticipantLocationChanged {
19 participant_id: PeerId,
20 },
21 RemoteVideoTracksChanged {
22 participant_id: PeerId,
23 },
24 RemoteProjectShared {
25 owner: Arc<User>,
26 project_id: u64,
27 worktree_root_names: Vec<String>,
28 },
29 RemoteProjectUnshared {
30 project_id: u64,
31 },
32 Left,
33}
34
35pub struct Room {
36 id: u64,
37 live_kit: Option<LiveKitRoom>,
38 status: RoomStatus,
39 local_participant: LocalParticipant,
40 remote_participants: BTreeMap<PeerId, RemoteParticipant>,
41 pending_participants: Vec<Arc<User>>,
42 participant_user_ids: HashSet<u64>,
43 pending_call_count: usize,
44 leave_when_empty: bool,
45 client: Arc<Client>,
46 user_store: ModelHandle<UserStore>,
47 subscriptions: Vec<client::Subscription>,
48 pending_room_update: Option<Task<()>>,
49}
50
51impl Entity for Room {
52 type Event = Event;
53
54 fn release(&mut self, _: &mut MutableAppContext) {
55 if self.status.is_online() {
56 self.client.send(proto::LeaveRoom {}).log_err();
57 }
58 }
59}
60
61impl Room {
62 fn new(
63 id: u64,
64 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
65 client: Arc<Client>,
66 user_store: ModelHandle<UserStore>,
67 cx: &mut ModelContext<Self>,
68 ) -> Self {
69 let mut client_status = client.status();
70 cx.spawn_weak(|this, mut cx| async move {
71 let is_connected = client_status
72 .next()
73 .await
74 .map_or(false, |s| s.is_connected());
75 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
76 if !is_connected || client_status.next().await.is_some() {
77 if let Some(this) = this.upgrade(&cx) {
78 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
79 }
80 }
81 })
82 .detach();
83
84 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
85 let room = live_kit_client::Room::new();
86 let mut status = room.status();
87 // Consume the initial status of the room.
88 let _ = status.try_recv();
89 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
90 while let Some(status) = status.next().await {
91 let this = if let Some(this) = this.upgrade(&cx) {
92 this
93 } else {
94 break;
95 };
96
97 if status == live_kit_client::ConnectionState::Disconnected {
98 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
99 break;
100 }
101 }
102 });
103
104 let mut track_changes = room.remote_video_track_updates();
105 let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
106 while let Some(track_change) = track_changes.next().await {
107 let this = if let Some(this) = this.upgrade(&cx) {
108 this
109 } else {
110 break;
111 };
112
113 this.update(&mut cx, |this, cx| {
114 this.remote_video_track_updated(track_change, cx).log_err()
115 });
116 }
117 });
118
119 cx.foreground()
120 .spawn(room.connect(&connection_info.server_url, &connection_info.token))
121 .detach_and_log_err(cx);
122
123 Some(LiveKitRoom {
124 room,
125 screen_track: ScreenTrack::None,
126 next_publish_id: 0,
127 _maintain_room,
128 _maintain_tracks,
129 })
130 } else {
131 None
132 };
133
134 Self {
135 id,
136 live_kit: live_kit_room,
137 status: RoomStatus::Online,
138 participant_user_ids: Default::default(),
139 local_participant: Default::default(),
140 remote_participants: Default::default(),
141 pending_participants: Default::default(),
142 pending_call_count: 0,
143 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
144 leave_when_empty: false,
145 pending_room_update: None,
146 client,
147 user_store,
148 }
149 }
150
151 pub(crate) fn create(
152 called_user_id: u64,
153 initial_project: Option<ModelHandle<Project>>,
154 client: Arc<Client>,
155 user_store: ModelHandle<UserStore>,
156 cx: &mut MutableAppContext,
157 ) -> Task<Result<ModelHandle<Self>>> {
158 cx.spawn(|mut cx| async move {
159 let response = client.request(proto::CreateRoom {}).await?;
160 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
161 let room = cx.add_model(|cx| {
162 Self::new(
163 room_proto.id,
164 response.live_kit_connection_info,
165 client,
166 user_store,
167 cx,
168 )
169 });
170
171 let initial_project_id = if let Some(initial_project) = initial_project {
172 let initial_project_id = room
173 .update(&mut cx, |room, cx| {
174 room.share_project(initial_project.clone(), cx)
175 })
176 .await?;
177 Some(initial_project_id)
178 } else {
179 None
180 };
181
182 match room
183 .update(&mut cx, |room, cx| {
184 room.leave_when_empty = true;
185 room.call(called_user_id, initial_project_id, cx)
186 })
187 .await
188 {
189 Ok(()) => Ok(room),
190 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
191 }
192 })
193 }
194
195 pub(crate) fn join(
196 call: &IncomingCall,
197 client: Arc<Client>,
198 user_store: ModelHandle<UserStore>,
199 cx: &mut MutableAppContext,
200 ) -> Task<Result<ModelHandle<Self>>> {
201 let room_id = call.room_id;
202 cx.spawn(|mut cx| async move {
203 let response = client.request(proto::JoinRoom { id: room_id }).await?;
204 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
205 let room = cx.add_model(|cx| {
206 Self::new(
207 room_id,
208 response.live_kit_connection_info,
209 client,
210 user_store,
211 cx,
212 )
213 });
214 room.update(&mut cx, |room, cx| {
215 room.leave_when_empty = true;
216 room.apply_room_update(room_proto, cx)?;
217 anyhow::Ok(())
218 })?;
219 Ok(room)
220 })
221 }
222
223 fn should_leave(&self) -> bool {
224 self.leave_when_empty
225 && self.pending_room_update.is_none()
226 && self.pending_participants.is_empty()
227 && self.remote_participants.is_empty()
228 && self.pending_call_count == 0
229 }
230
231 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
232 if self.status.is_offline() {
233 return Err(anyhow!("room is offline"));
234 }
235
236 cx.notify();
237 cx.emit(Event::Left);
238 self.status = RoomStatus::Offline;
239 self.remote_participants.clear();
240 self.pending_participants.clear();
241 self.participant_user_ids.clear();
242 self.subscriptions.clear();
243 self.live_kit.take();
244 self.client.send(proto::LeaveRoom {})?;
245 Ok(())
246 }
247
248 pub fn id(&self) -> u64 {
249 self.id
250 }
251
252 pub fn status(&self) -> RoomStatus {
253 self.status
254 }
255
256 pub fn local_participant(&self) -> &LocalParticipant {
257 &self.local_participant
258 }
259
260 pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
261 &self.remote_participants
262 }
263
264 pub fn pending_participants(&self) -> &[Arc<User>] {
265 &self.pending_participants
266 }
267
268 pub fn contains_participant(&self, user_id: u64) -> bool {
269 self.participant_user_ids.contains(&user_id)
270 }
271
272 async fn handle_room_updated(
273 this: ModelHandle<Self>,
274 envelope: TypedEnvelope<proto::RoomUpdated>,
275 _: Arc<Client>,
276 mut cx: AsyncAppContext,
277 ) -> Result<()> {
278 let room = envelope
279 .payload
280 .room
281 .ok_or_else(|| anyhow!("invalid room"))?;
282 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
283 }
284
285 fn apply_room_update(
286 &mut self,
287 mut room: proto::Room,
288 cx: &mut ModelContext<Self>,
289 ) -> Result<()> {
290 // Filter ourselves out from the room's participants.
291 let local_participant_ix = room
292 .participants
293 .iter()
294 .position(|participant| Some(participant.user_id) == self.client.user_id());
295 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
296
297 let pending_participant_user_ids = room
298 .pending_participants
299 .iter()
300 .map(|p| p.user_id)
301 .collect::<Vec<_>>();
302 let remote_participant_user_ids = room
303 .participants
304 .iter()
305 .map(|p| p.user_id)
306 .collect::<Vec<_>>();
307 let (remote_participants, pending_participants) =
308 self.user_store.update(cx, move |user_store, cx| {
309 (
310 user_store.get_users(remote_participant_user_ids, cx),
311 user_store.get_users(pending_participant_user_ids, cx),
312 )
313 });
314 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
315 let (remote_participants, pending_participants) =
316 futures::join!(remote_participants, pending_participants);
317
318 this.update(&mut cx, |this, cx| {
319 this.participant_user_ids.clear();
320
321 if let Some(participant) = local_participant {
322 this.local_participant.projects = participant.projects;
323 } else {
324 this.local_participant.projects.clear();
325 }
326
327 if let Some(participants) = remote_participants.log_err() {
328 for (participant, user) in room.participants.into_iter().zip(participants) {
329 let peer_id = PeerId(participant.peer_id);
330 this.participant_user_ids.insert(participant.user_id);
331
332 let old_projects = this
333 .remote_participants
334 .get(&peer_id)
335 .into_iter()
336 .flat_map(|existing| &existing.projects)
337 .map(|project| project.id)
338 .collect::<HashSet<_>>();
339 let new_projects = participant
340 .projects
341 .iter()
342 .map(|project| project.id)
343 .collect::<HashSet<_>>();
344
345 for project in &participant.projects {
346 if !old_projects.contains(&project.id) {
347 cx.emit(Event::RemoteProjectShared {
348 owner: user.clone(),
349 project_id: project.id,
350 worktree_root_names: project.worktree_root_names.clone(),
351 });
352 }
353 }
354
355 for unshared_project_id in old_projects.difference(&new_projects) {
356 cx.emit(Event::RemoteProjectUnshared {
357 project_id: *unshared_project_id,
358 });
359 }
360
361 let location = ParticipantLocation::from_proto(participant.location)
362 .unwrap_or(ParticipantLocation::External);
363 if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id)
364 {
365 remote_participant.projects = participant.projects;
366 if location != remote_participant.location {
367 remote_participant.location = location;
368 cx.emit(Event::ParticipantLocationChanged {
369 participant_id: peer_id,
370 });
371 }
372 } else {
373 this.remote_participants.insert(
374 peer_id,
375 RemoteParticipant {
376 user: user.clone(),
377 projects: participant.projects,
378 location,
379 tracks: Default::default(),
380 },
381 );
382
383 if let Some(live_kit) = this.live_kit.as_ref() {
384 let tracks =
385 live_kit.room.remote_video_tracks(&peer_id.0.to_string());
386 for track in tracks {
387 this.remote_video_track_updated(
388 RemoteVideoTrackUpdate::Subscribed(track),
389 cx,
390 )
391 .log_err();
392 }
393 }
394 }
395 }
396
397 this.remote_participants.retain(|_, participant| {
398 if this.participant_user_ids.contains(&participant.user.id) {
399 true
400 } else {
401 for project in &participant.projects {
402 cx.emit(Event::RemoteProjectUnshared {
403 project_id: project.id,
404 });
405 }
406 false
407 }
408 });
409 }
410
411 if let Some(pending_participants) = pending_participants.log_err() {
412 this.pending_participants = pending_participants;
413 for participant in &this.pending_participants {
414 this.participant_user_ids.insert(participant.id);
415 }
416 }
417
418 this.pending_room_update.take();
419 if this.should_leave() {
420 let _ = this.leave(cx);
421 }
422
423 this.check_invariants();
424 cx.notify();
425 });
426 }));
427
428 cx.notify();
429 Ok(())
430 }
431
432 fn remote_video_track_updated(
433 &mut self,
434 change: RemoteVideoTrackUpdate,
435 cx: &mut ModelContext<Self>,
436 ) -> Result<()> {
437 match change {
438 RemoteVideoTrackUpdate::Subscribed(track) => {
439 let peer_id = PeerId(track.publisher_id().parse()?);
440 let track_id = track.sid().to_string();
441 let participant = self
442 .remote_participants
443 .get_mut(&peer_id)
444 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
445 participant.tracks.insert(
446 track_id.clone(),
447 Arc::new(RemoteVideoTrack {
448 live_kit_track: track,
449 }),
450 );
451 cx.emit(Event::RemoteVideoTracksChanged {
452 participant_id: peer_id,
453 });
454 }
455 RemoteVideoTrackUpdate::Unsubscribed {
456 publisher_id,
457 track_id,
458 } => {
459 let peer_id = PeerId(publisher_id.parse()?);
460 let participant = self
461 .remote_participants
462 .get_mut(&peer_id)
463 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
464 participant.tracks.remove(&track_id);
465 cx.emit(Event::RemoteVideoTracksChanged {
466 participant_id: peer_id,
467 });
468 }
469 }
470
471 cx.notify();
472 Ok(())
473 }
474
475 fn check_invariants(&self) {
476 #[cfg(any(test, feature = "test-support"))]
477 {
478 for participant in self.remote_participants.values() {
479 assert!(self.participant_user_ids.contains(&participant.user.id));
480 }
481
482 for participant in &self.pending_participants {
483 assert!(self.participant_user_ids.contains(&participant.id));
484 }
485
486 assert_eq!(
487 self.participant_user_ids.len(),
488 self.remote_participants.len() + self.pending_participants.len()
489 );
490 }
491 }
492
493 pub(crate) fn call(
494 &mut self,
495 called_user_id: u64,
496 initial_project_id: Option<u64>,
497 cx: &mut ModelContext<Self>,
498 ) -> Task<Result<()>> {
499 if self.status.is_offline() {
500 return Task::ready(Err(anyhow!("room is offline")));
501 }
502
503 cx.notify();
504 let client = self.client.clone();
505 let room_id = self.id;
506 self.pending_call_count += 1;
507 cx.spawn(|this, mut cx| async move {
508 let result = client
509 .request(proto::Call {
510 room_id,
511 called_user_id,
512 initial_project_id,
513 })
514 .await;
515 this.update(&mut cx, |this, cx| {
516 this.pending_call_count -= 1;
517 if this.should_leave() {
518 this.leave(cx)?;
519 }
520 result
521 })?;
522 Ok(())
523 })
524 }
525
526 pub(crate) fn share_project(
527 &mut self,
528 project: ModelHandle<Project>,
529 cx: &mut ModelContext<Self>,
530 ) -> Task<Result<u64>> {
531 if let Some(project_id) = project.read(cx).remote_id() {
532 return Task::ready(Ok(project_id));
533 }
534
535 let request = self.client.request(proto::ShareProject {
536 room_id: self.id(),
537 worktrees: project
538 .read(cx)
539 .worktrees(cx)
540 .map(|worktree| {
541 let worktree = worktree.read(cx);
542 proto::WorktreeMetadata {
543 id: worktree.id().to_proto(),
544 root_name: worktree.root_name().into(),
545 visible: worktree.is_visible(),
546 abs_path: worktree.abs_path().to_string_lossy().into(),
547 }
548 })
549 .collect(),
550 });
551 cx.spawn(|this, mut cx| async move {
552 let response = request.await?;
553
554 project.update(&mut cx, |project, cx| {
555 project
556 .shared(response.project_id, cx)
557 .detach_and_log_err(cx)
558 });
559
560 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
561 this.update(&mut cx, |this, cx| {
562 let active_project = this.local_participant.active_project.as_ref();
563 if active_project.map_or(false, |location| *location == project) {
564 this.set_location(Some(&project), cx)
565 } else {
566 Task::ready(Ok(()))
567 }
568 })
569 .await?;
570
571 Ok(response.project_id)
572 })
573 }
574
575 pub(crate) fn set_location(
576 &mut self,
577 project: Option<&ModelHandle<Project>>,
578 cx: &mut ModelContext<Self>,
579 ) -> Task<Result<()>> {
580 if self.status.is_offline() {
581 return Task::ready(Err(anyhow!("room is offline")));
582 }
583
584 let client = self.client.clone();
585 let room_id = self.id;
586 let location = if let Some(project) = project {
587 self.local_participant.active_project = Some(project.downgrade());
588 if let Some(project_id) = project.read(cx).remote_id() {
589 proto::participant_location::Variant::SharedProject(
590 proto::participant_location::SharedProject { id: project_id },
591 )
592 } else {
593 proto::participant_location::Variant::UnsharedProject(
594 proto::participant_location::UnsharedProject {},
595 )
596 }
597 } else {
598 self.local_participant.active_project = None;
599 proto::participant_location::Variant::External(proto::participant_location::External {})
600 };
601
602 cx.notify();
603 cx.foreground().spawn(async move {
604 client
605 .request(proto::UpdateParticipantLocation {
606 room_id,
607 location: Some(proto::ParticipantLocation {
608 variant: Some(location),
609 }),
610 })
611 .await?;
612 Ok(())
613 })
614 }
615
616 pub fn is_screen_sharing(&self) -> bool {
617 self.live_kit.as_ref().map_or(false, |live_kit| {
618 !matches!(live_kit.screen_track, ScreenTrack::None)
619 })
620 }
621
622 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
623 if self.status.is_offline() {
624 return Task::ready(Err(anyhow!("room is offline")));
625 } else if self.is_screen_sharing() {
626 return Task::ready(Err(anyhow!("screen was already shared")));
627 }
628
629 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
630 let publish_id = post_inc(&mut live_kit.next_publish_id);
631 live_kit.screen_track = ScreenTrack::Pending { publish_id };
632 cx.notify();
633 (live_kit.room.display_sources(), publish_id)
634 } else {
635 return Task::ready(Err(anyhow!("live-kit was not initialized")));
636 };
637
638 cx.spawn_weak(|this, mut cx| async move {
639 let publish_track = async {
640 let displays = displays.await?;
641 let display = displays
642 .first()
643 .ok_or_else(|| anyhow!("no display found"))?;
644 let track = LocalVideoTrack::screen_share_for_display(&display);
645 this.upgrade(&cx)
646 .ok_or_else(|| anyhow!("room was dropped"))?
647 .read_with(&cx, |this, _| {
648 this.live_kit
649 .as_ref()
650 .map(|live_kit| live_kit.room.publish_video_track(&track))
651 })
652 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
653 .await
654 };
655
656 let publication = publish_track.await;
657 this.upgrade(&cx)
658 .ok_or_else(|| anyhow!("room was dropped"))?
659 .update(&mut cx, |this, cx| {
660 let live_kit = this
661 .live_kit
662 .as_mut()
663 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
664
665 let canceled = if let ScreenTrack::Pending {
666 publish_id: cur_publish_id,
667 } = &live_kit.screen_track
668 {
669 *cur_publish_id != publish_id
670 } else {
671 true
672 };
673
674 match publication {
675 Ok(publication) => {
676 if canceled {
677 live_kit.room.unpublish_track(publication);
678 } else {
679 live_kit.screen_track = ScreenTrack::Published(publication);
680 cx.notify();
681 }
682 Ok(())
683 }
684 Err(error) => {
685 if canceled {
686 Ok(())
687 } else {
688 live_kit.screen_track = ScreenTrack::None;
689 cx.notify();
690 Err(error)
691 }
692 }
693 }
694 })
695 })
696 }
697
698 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
699 if self.status.is_offline() {
700 return Err(anyhow!("room is offline"));
701 }
702
703 let live_kit = self
704 .live_kit
705 .as_mut()
706 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
707 match mem::take(&mut live_kit.screen_track) {
708 ScreenTrack::None => Err(anyhow!("screen was not shared")),
709 ScreenTrack::Pending { .. } => {
710 cx.notify();
711 Ok(())
712 }
713 ScreenTrack::Published(track) => {
714 live_kit.room.unpublish_track(track);
715 cx.notify();
716 Ok(())
717 }
718 }
719 }
720
721 #[cfg(any(test, feature = "test-support"))]
722 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
723 self.live_kit
724 .as_ref()
725 .unwrap()
726 .room
727 .set_display_sources(sources);
728 }
729}
730
731struct LiveKitRoom {
732 room: Arc<live_kit_client::Room>,
733 screen_track: ScreenTrack,
734 next_publish_id: usize,
735 _maintain_room: Task<()>,
736 _maintain_tracks: Task<()>,
737}
738
739enum ScreenTrack {
740 None,
741 Pending { publish_id: usize },
742 Published(LocalTrackPublication),
743}
744
745impl Default for ScreenTrack {
746 fn default() -> Self {
747 Self::None
748 }
749}
750
751#[derive(Copy, Clone, PartialEq, Eq)]
752pub enum RoomStatus {
753 Online,
754 Offline,
755}
756
757impl RoomStatus {
758 pub fn is_offline(&self) -> bool {
759 matches!(self, RoomStatus::Offline)
760 }
761
762 pub fn is_online(&self) -> bool {
763 matches!(self, RoomStatus::Online)
764 }
765}