1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
7use collections::{BTreeMap, HashSet};
8use futures::StreamExt;
9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
10use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
11use postage::stream::Stream;
12use project::Project;
13use std::{mem, os::unix::prelude::OsStrExt, sync::Arc};
14use util::{post_inc, ResultExt};
15
16#[derive(Clone, Debug, PartialEq, Eq)]
17pub enum Event {
18 ParticipantLocationChanged {
19 participant_id: PeerId,
20 },
21 RemoteVideoTracksChanged {
22 participant_id: PeerId,
23 },
24 RemoteProjectShared {
25 owner: Arc<User>,
26 project_id: u64,
27 worktree_root_names: Vec<String>,
28 },
29 RemoteProjectUnshared {
30 project_id: u64,
31 },
32 Left,
33}
34
35pub struct Room {
36 id: u64,
37 live_kit: Option<LiveKitRoom>,
38 status: RoomStatus,
39 local_participant: LocalParticipant,
40 remote_participants: BTreeMap<PeerId, RemoteParticipant>,
41 pending_participants: Vec<Arc<User>>,
42 participant_user_ids: HashSet<u64>,
43 pending_call_count: usize,
44 leave_when_empty: bool,
45 client: Arc<Client>,
46 user_store: ModelHandle<UserStore>,
47 subscriptions: Vec<client::Subscription>,
48 pending_room_update: Option<Task<()>>,
49}
50
51impl Entity for Room {
52 type Event = Event;
53
54 fn release(&mut self, _: &mut MutableAppContext) {
55 if self.status.is_online() {
56 self.client.send(proto::LeaveRoom { id: self.id }).log_err();
57 }
58 }
59}
60
61impl Room {
62 fn new(
63 id: u64,
64 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
65 client: Arc<Client>,
66 user_store: ModelHandle<UserStore>,
67 cx: &mut ModelContext<Self>,
68 ) -> Self {
69 let mut client_status = client.status();
70 cx.spawn_weak(|this, mut cx| async move {
71 let is_connected = client_status
72 .next()
73 .await
74 .map_or(false, |s| s.is_connected());
75 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
76 if !is_connected || client_status.next().await.is_some() {
77 if let Some(this) = this.upgrade(&cx) {
78 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
79 }
80 }
81 })
82 .detach();
83
84 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
85 let room = live_kit_client::Room::new();
86 let mut status = room.status();
87 // Consume the initial status of the room.
88 let _ = status.try_recv();
89 let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
90 while let Some(status) = status.next().await {
91 let this = if let Some(this) = this.upgrade(&cx) {
92 this
93 } else {
94 break;
95 };
96
97 if status == live_kit_client::ConnectionState::Disconnected {
98 this.update(&mut cx, |this, cx| this.leave(cx).log_err());
99 break;
100 }
101 }
102 });
103
104 let mut track_changes = room.remote_video_track_updates();
105 let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
106 while let Some(track_change) = track_changes.next().await {
107 let this = if let Some(this) = this.upgrade(&cx) {
108 this
109 } else {
110 break;
111 };
112
113 this.update(&mut cx, |this, cx| {
114 this.remote_video_track_updated(track_change, cx).log_err()
115 });
116 }
117 });
118
119 cx.foreground()
120 .spawn(room.connect(&connection_info.server_url, &connection_info.token))
121 .detach_and_log_err(cx);
122
123 Some(LiveKitRoom {
124 room,
125 screen_track: ScreenTrack::None,
126 next_publish_id: 0,
127 _maintain_room,
128 _maintain_tracks,
129 })
130 } else {
131 None
132 };
133
134 Self {
135 id,
136 live_kit: live_kit_room,
137 status: RoomStatus::Online,
138 participant_user_ids: Default::default(),
139 local_participant: Default::default(),
140 remote_participants: Default::default(),
141 pending_participants: Default::default(),
142 pending_call_count: 0,
143 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
144 leave_when_empty: false,
145 pending_room_update: None,
146 client,
147 user_store,
148 }
149 }
150
151 pub(crate) fn create(
152 recipient_user_id: u64,
153 initial_project: Option<ModelHandle<Project>>,
154 client: Arc<Client>,
155 user_store: ModelHandle<UserStore>,
156 cx: &mut MutableAppContext,
157 ) -> Task<Result<ModelHandle<Self>>> {
158 cx.spawn(|mut cx| async move {
159 let response = client.request(proto::CreateRoom {}).await?;
160 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
161 let room = cx.add_model(|cx| {
162 Self::new(
163 room_proto.id,
164 response.live_kit_connection_info,
165 client,
166 user_store,
167 cx,
168 )
169 });
170
171 let initial_project_id = if let Some(initial_project) = initial_project {
172 let initial_project_id = room
173 .update(&mut cx, |room, cx| {
174 room.share_project(initial_project.clone(), cx)
175 })
176 .await?;
177 Some(initial_project_id)
178 } else {
179 None
180 };
181
182 match room
183 .update(&mut cx, |room, cx| {
184 room.leave_when_empty = true;
185 room.call(recipient_user_id, initial_project_id, cx)
186 })
187 .await
188 {
189 Ok(()) => Ok(room),
190 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
191 }
192 })
193 }
194
195 pub(crate) fn join(
196 call: &IncomingCall,
197 client: Arc<Client>,
198 user_store: ModelHandle<UserStore>,
199 cx: &mut MutableAppContext,
200 ) -> Task<Result<ModelHandle<Self>>> {
201 let room_id = call.room_id;
202 cx.spawn(|mut cx| async move {
203 let response = client.request(proto::JoinRoom { id: room_id }).await?;
204 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
205 let room = cx.add_model(|cx| {
206 Self::new(
207 room_id,
208 response.live_kit_connection_info,
209 client,
210 user_store,
211 cx,
212 )
213 });
214 room.update(&mut cx, |room, cx| {
215 room.leave_when_empty = true;
216 room.apply_room_update(room_proto, cx)?;
217 anyhow::Ok(())
218 })?;
219 Ok(room)
220 })
221 }
222
223 fn should_leave(&self) -> bool {
224 self.leave_when_empty
225 && self.pending_room_update.is_none()
226 && self.pending_participants.is_empty()
227 && self.remote_participants.is_empty()
228 && self.pending_call_count == 0
229 }
230
231 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
232 if self.status.is_offline() {
233 return Err(anyhow!("room is offline"));
234 }
235
236 cx.notify();
237 cx.emit(Event::Left);
238 self.status = RoomStatus::Offline;
239 self.remote_participants.clear();
240 self.pending_participants.clear();
241 self.participant_user_ids.clear();
242 self.subscriptions.clear();
243 self.live_kit.take();
244 self.client.send(proto::LeaveRoom { id: self.id })?;
245 Ok(())
246 }
247
248 pub fn id(&self) -> u64 {
249 self.id
250 }
251
252 pub fn status(&self) -> RoomStatus {
253 self.status
254 }
255
256 pub fn local_participant(&self) -> &LocalParticipant {
257 &self.local_participant
258 }
259
260 pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
261 &self.remote_participants
262 }
263
264 pub fn pending_participants(&self) -> &[Arc<User>] {
265 &self.pending_participants
266 }
267
268 pub fn contains_participant(&self, user_id: u64) -> bool {
269 self.participant_user_ids.contains(&user_id)
270 }
271
272 async fn handle_room_updated(
273 this: ModelHandle<Self>,
274 envelope: TypedEnvelope<proto::RoomUpdated>,
275 _: Arc<Client>,
276 mut cx: AsyncAppContext,
277 ) -> Result<()> {
278 let room = envelope
279 .payload
280 .room
281 .ok_or_else(|| anyhow!("invalid room"))?;
282 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
283 }
284
285 fn apply_room_update(
286 &mut self,
287 mut room: proto::Room,
288 cx: &mut ModelContext<Self>,
289 ) -> Result<()> {
290 // Filter ourselves out from the room's participants.
291 let local_participant_ix = room
292 .participants
293 .iter()
294 .position(|participant| Some(participant.user_id) == self.client.user_id());
295 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
296
297 let remote_participant_user_ids = room
298 .participants
299 .iter()
300 .map(|p| p.user_id)
301 .collect::<Vec<_>>();
302 let (remote_participants, pending_participants) =
303 self.user_store.update(cx, move |user_store, cx| {
304 (
305 user_store.get_users(remote_participant_user_ids, cx),
306 user_store.get_users(room.pending_participant_user_ids, cx),
307 )
308 });
309 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
310 let (remote_participants, pending_participants) =
311 futures::join!(remote_participants, pending_participants);
312
313 this.update(&mut cx, |this, cx| {
314 this.participant_user_ids.clear();
315
316 if let Some(participant) = local_participant {
317 this.local_participant.projects = participant.projects;
318 } else {
319 this.local_participant.projects.clear();
320 }
321
322 if let Some(participants) = remote_participants.log_err() {
323 for (participant, user) in room.participants.into_iter().zip(participants) {
324 let peer_id = PeerId(participant.peer_id);
325 this.participant_user_ids.insert(participant.user_id);
326
327 let old_projects = this
328 .remote_participants
329 .get(&peer_id)
330 .into_iter()
331 .flat_map(|existing| &existing.projects)
332 .map(|project| project.id)
333 .collect::<HashSet<_>>();
334 let new_projects = participant
335 .projects
336 .iter()
337 .map(|project| project.id)
338 .collect::<HashSet<_>>();
339
340 for project in &participant.projects {
341 if !old_projects.contains(&project.id) {
342 cx.emit(Event::RemoteProjectShared {
343 owner: user.clone(),
344 project_id: project.id,
345 worktree_root_names: project.worktree_root_names.clone(),
346 });
347 }
348 }
349
350 for unshared_project_id in old_projects.difference(&new_projects) {
351 cx.emit(Event::RemoteProjectUnshared {
352 project_id: *unshared_project_id,
353 });
354 }
355
356 let location = ParticipantLocation::from_proto(participant.location)
357 .unwrap_or(ParticipantLocation::External);
358 if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id)
359 {
360 remote_participant.projects = participant.projects;
361 if location != remote_participant.location {
362 remote_participant.location = location;
363 cx.emit(Event::ParticipantLocationChanged {
364 participant_id: peer_id,
365 });
366 }
367 } else {
368 this.remote_participants.insert(
369 peer_id,
370 RemoteParticipant {
371 user: user.clone(),
372 projects: participant.projects,
373 location,
374 tracks: Default::default(),
375 },
376 );
377
378 if let Some(live_kit) = this.live_kit.as_ref() {
379 let tracks =
380 live_kit.room.remote_video_tracks(&peer_id.0.to_string());
381 for track in tracks {
382 this.remote_video_track_updated(
383 RemoteVideoTrackUpdate::Subscribed(track),
384 cx,
385 )
386 .log_err();
387 }
388 }
389 }
390 }
391
392 this.remote_participants.retain(|_, participant| {
393 if this.participant_user_ids.contains(&participant.user.id) {
394 true
395 } else {
396 for project in &participant.projects {
397 cx.emit(Event::RemoteProjectUnshared {
398 project_id: project.id,
399 });
400 }
401 false
402 }
403 });
404 }
405
406 if let Some(pending_participants) = pending_participants.log_err() {
407 this.pending_participants = pending_participants;
408 for participant in &this.pending_participants {
409 this.participant_user_ids.insert(participant.id);
410 }
411 }
412
413 this.pending_room_update.take();
414 if this.should_leave() {
415 let _ = this.leave(cx);
416 }
417
418 this.check_invariants();
419 cx.notify();
420 });
421 }));
422
423 cx.notify();
424 Ok(())
425 }
426
427 fn remote_video_track_updated(
428 &mut self,
429 change: RemoteVideoTrackUpdate,
430 cx: &mut ModelContext<Self>,
431 ) -> Result<()> {
432 match change {
433 RemoteVideoTrackUpdate::Subscribed(track) => {
434 let peer_id = PeerId(track.publisher_id().parse()?);
435 let track_id = track.sid().to_string();
436 let participant = self
437 .remote_participants
438 .get_mut(&peer_id)
439 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
440 participant.tracks.insert(
441 track_id.clone(),
442 Arc::new(RemoteVideoTrack {
443 live_kit_track: track,
444 }),
445 );
446 cx.emit(Event::RemoteVideoTracksChanged {
447 participant_id: peer_id,
448 });
449 }
450 RemoteVideoTrackUpdate::Unsubscribed {
451 publisher_id,
452 track_id,
453 } => {
454 let peer_id = PeerId(publisher_id.parse()?);
455 let participant = self
456 .remote_participants
457 .get_mut(&peer_id)
458 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
459 participant.tracks.remove(&track_id);
460 cx.emit(Event::RemoteVideoTracksChanged {
461 participant_id: peer_id,
462 });
463 }
464 }
465
466 cx.notify();
467 Ok(())
468 }
469
470 fn check_invariants(&self) {
471 #[cfg(any(test, feature = "test-support"))]
472 {
473 for participant in self.remote_participants.values() {
474 assert!(self.participant_user_ids.contains(&participant.user.id));
475 }
476
477 for participant in &self.pending_participants {
478 assert!(self.participant_user_ids.contains(&participant.id));
479 }
480
481 assert_eq!(
482 self.participant_user_ids.len(),
483 self.remote_participants.len() + self.pending_participants.len()
484 );
485 }
486 }
487
488 pub(crate) fn call(
489 &mut self,
490 recipient_user_id: u64,
491 initial_project_id: Option<u64>,
492 cx: &mut ModelContext<Self>,
493 ) -> Task<Result<()>> {
494 if self.status.is_offline() {
495 return Task::ready(Err(anyhow!("room is offline")));
496 }
497
498 cx.notify();
499 let client = self.client.clone();
500 let room_id = self.id;
501 self.pending_call_count += 1;
502 cx.spawn(|this, mut cx| async move {
503 let result = client
504 .request(proto::Call {
505 room_id,
506 recipient_user_id,
507 initial_project_id,
508 })
509 .await;
510 this.update(&mut cx, |this, cx| {
511 this.pending_call_count -= 1;
512 if this.should_leave() {
513 this.leave(cx)?;
514 }
515 result
516 })?;
517 Ok(())
518 })
519 }
520
521 pub(crate) fn share_project(
522 &mut self,
523 project: ModelHandle<Project>,
524 cx: &mut ModelContext<Self>,
525 ) -> Task<Result<u64>> {
526 if let Some(project_id) = project.read(cx).remote_id() {
527 return Task::ready(Ok(project_id));
528 }
529
530 let request = self.client.request(proto::ShareProject {
531 room_id: self.id(),
532 worktrees: project
533 .read(cx)
534 .worktrees(cx)
535 .map(|worktree| {
536 let worktree = worktree.read(cx);
537 proto::WorktreeMetadata {
538 id: worktree.id().to_proto(),
539 root_name: worktree.root_name().into(),
540 visible: worktree.is_visible(),
541 abs_path: worktree.abs_path().as_os_str().as_bytes().to_vec(),
542 }
543 })
544 .collect(),
545 });
546 cx.spawn(|this, mut cx| async move {
547 let response = request.await?;
548
549 project.update(&mut cx, |project, cx| {
550 project
551 .shared(response.project_id, cx)
552 .detach_and_log_err(cx)
553 });
554
555 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
556 this.update(&mut cx, |this, cx| {
557 let active_project = this.local_participant.active_project.as_ref();
558 if active_project.map_or(false, |location| *location == project) {
559 this.set_location(Some(&project), cx)
560 } else {
561 Task::ready(Ok(()))
562 }
563 })
564 .await?;
565
566 Ok(response.project_id)
567 })
568 }
569
570 pub(crate) fn set_location(
571 &mut self,
572 project: Option<&ModelHandle<Project>>,
573 cx: &mut ModelContext<Self>,
574 ) -> Task<Result<()>> {
575 if self.status.is_offline() {
576 return Task::ready(Err(anyhow!("room is offline")));
577 }
578
579 let client = self.client.clone();
580 let room_id = self.id;
581 let location = if let Some(project) = project {
582 self.local_participant.active_project = Some(project.downgrade());
583 if let Some(project_id) = project.read(cx).remote_id() {
584 proto::participant_location::Variant::SharedProject(
585 proto::participant_location::SharedProject { id: project_id },
586 )
587 } else {
588 proto::participant_location::Variant::UnsharedProject(
589 proto::participant_location::UnsharedProject {},
590 )
591 }
592 } else {
593 self.local_participant.active_project = None;
594 proto::participant_location::Variant::External(proto::participant_location::External {})
595 };
596
597 cx.notify();
598 cx.foreground().spawn(async move {
599 client
600 .request(proto::UpdateParticipantLocation {
601 room_id,
602 location: Some(proto::ParticipantLocation {
603 variant: Some(location),
604 }),
605 })
606 .await?;
607 Ok(())
608 })
609 }
610
611 pub fn is_screen_sharing(&self) -> bool {
612 self.live_kit.as_ref().map_or(false, |live_kit| {
613 !matches!(live_kit.screen_track, ScreenTrack::None)
614 })
615 }
616
617 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
618 if self.status.is_offline() {
619 return Task::ready(Err(anyhow!("room is offline")));
620 } else if self.is_screen_sharing() {
621 return Task::ready(Err(anyhow!("screen was already shared")));
622 }
623
624 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
625 let publish_id = post_inc(&mut live_kit.next_publish_id);
626 live_kit.screen_track = ScreenTrack::Pending { publish_id };
627 cx.notify();
628 (live_kit.room.display_sources(), publish_id)
629 } else {
630 return Task::ready(Err(anyhow!("live-kit was not initialized")));
631 };
632
633 cx.spawn_weak(|this, mut cx| async move {
634 let publish_track = async {
635 let displays = displays.await?;
636 let display = displays
637 .first()
638 .ok_or_else(|| anyhow!("no display found"))?;
639 let track = LocalVideoTrack::screen_share_for_display(&display);
640 this.upgrade(&cx)
641 .ok_or_else(|| anyhow!("room was dropped"))?
642 .read_with(&cx, |this, _| {
643 this.live_kit
644 .as_ref()
645 .map(|live_kit| live_kit.room.publish_video_track(&track))
646 })
647 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
648 .await
649 };
650
651 let publication = publish_track.await;
652 this.upgrade(&cx)
653 .ok_or_else(|| anyhow!("room was dropped"))?
654 .update(&mut cx, |this, cx| {
655 let live_kit = this
656 .live_kit
657 .as_mut()
658 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
659
660 let canceled = if let ScreenTrack::Pending {
661 publish_id: cur_publish_id,
662 } = &live_kit.screen_track
663 {
664 *cur_publish_id != publish_id
665 } else {
666 true
667 };
668
669 match publication {
670 Ok(publication) => {
671 if canceled {
672 live_kit.room.unpublish_track(publication);
673 } else {
674 live_kit.screen_track = ScreenTrack::Published(publication);
675 cx.notify();
676 }
677 Ok(())
678 }
679 Err(error) => {
680 if canceled {
681 Ok(())
682 } else {
683 live_kit.screen_track = ScreenTrack::None;
684 cx.notify();
685 Err(error)
686 }
687 }
688 }
689 })
690 })
691 }
692
693 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
694 if self.status.is_offline() {
695 return Err(anyhow!("room is offline"));
696 }
697
698 let live_kit = self
699 .live_kit
700 .as_mut()
701 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
702 match mem::take(&mut live_kit.screen_track) {
703 ScreenTrack::None => Err(anyhow!("screen was not shared")),
704 ScreenTrack::Pending { .. } => {
705 cx.notify();
706 Ok(())
707 }
708 ScreenTrack::Published(track) => {
709 live_kit.room.unpublish_track(track);
710 cx.notify();
711 Ok(())
712 }
713 }
714 }
715
716 #[cfg(any(test, feature = "test-support"))]
717 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
718 self.live_kit
719 .as_ref()
720 .unwrap()
721 .room
722 .set_display_sources(sources);
723 }
724}
725
726struct LiveKitRoom {
727 room: Arc<live_kit_client::Room>,
728 screen_track: ScreenTrack,
729 next_publish_id: usize,
730 _maintain_room: Task<()>,
731 _maintain_tracks: Task<()>,
732}
733
734enum ScreenTrack {
735 None,
736 Pending { publish_id: usize },
737 Published(LocalTrackPublication),
738}
739
740impl Default for ScreenTrack {
741 fn default() -> Self {
742 Self::None
743 }
744}
745
746#[derive(Copy, Clone, PartialEq, Eq)]
747pub enum RoomStatus {
748 Online,
749 Offline,
750}
751
752impl RoomStatus {
753 pub fn is_offline(&self) -> bool {
754 matches!(self, RoomStatus::Offline)
755 }
756
757 pub fn is_online(&self) -> bool {
758 matches!(self, RoomStatus::Online)
759 }
760}