1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
7use collections::{BTreeMap, HashSet};
8use futures::StreamExt;
9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
10use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate};
11use postage::watch;
12use project::Project;
13use std::sync::Arc;
14use util::ResultExt;
15
16#[derive(Clone, Debug, PartialEq, Eq)]
17pub enum Event {
18 Frame {
19 participant_id: PeerId,
20 track_id: live_kit_client::Sid,
21 },
22 RemoteProjectShared {
23 owner: Arc<User>,
24 project_id: u64,
25 worktree_root_names: Vec<String>,
26 },
27 RemoteProjectUnshared {
28 project_id: u64,
29 },
30 Left,
31}
32
33pub struct Room {
34 id: u64,
35 live_kit_room: Option<(Arc<live_kit_client::Room>, Task<()>)>,
36 status: RoomStatus,
37 local_participant: LocalParticipant,
38 remote_participants: BTreeMap<PeerId, RemoteParticipant>,
39 pending_participants: Vec<Arc<User>>,
40 participant_user_ids: HashSet<u64>,
41 pending_call_count: usize,
42 leave_when_empty: bool,
43 client: Arc<Client>,
44 user_store: ModelHandle<UserStore>,
45 subscriptions: Vec<client::Subscription>,
46 pending_room_update: Option<Task<()>>,
47}
48
49impl Entity for Room {
50 type Event = Event;
51
52 fn release(&mut self, _: &mut MutableAppContext) {
53 self.client.send(proto::LeaveRoom { id: self.id }).log_err();
54 }
55}
56
57impl Room {
58 fn new(
59 id: u64,
60 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
61 client: Arc<Client>,
62 user_store: ModelHandle<UserStore>,
63 cx: &mut ModelContext<Self>,
64 ) -> Self {
65 let mut client_status = client.status();
66 cx.spawn_weak(|this, mut cx| async move {
67 let is_connected = client_status
68 .next()
69 .await
70 .map_or(false, |s| s.is_connected());
71 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
72 if !is_connected || client_status.next().await.is_some() {
73 if let Some(this) = this.upgrade(&cx) {
74 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
75 }
76 }
77 })
78 .detach();
79
80 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
81 let room = live_kit_client::Room::new();
82 let mut track_changes = room.remote_video_track_updates();
83 let maintain_room = cx.spawn_weak(|this, mut cx| async move {
84 while let Some(track_change) = track_changes.next().await {
85 let this = if let Some(this) = this.upgrade(&cx) {
86 this
87 } else {
88 break;
89 };
90
91 this.update(&mut cx, |this, cx| {
92 this.remote_video_track_updated(track_change, cx).log_err()
93 });
94 }
95 });
96 cx.foreground()
97 .spawn(room.connect(&connection_info.server_url, &connection_info.token))
98 .detach_and_log_err(cx);
99 Some((room, maintain_room))
100 } else {
101 None
102 };
103
104 Self {
105 id,
106 live_kit_room,
107 status: RoomStatus::Online,
108 participant_user_ids: Default::default(),
109 local_participant: Default::default(),
110 remote_participants: Default::default(),
111 pending_participants: Default::default(),
112 pending_call_count: 0,
113 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
114 leave_when_empty: false,
115 pending_room_update: None,
116 client,
117 user_store,
118 }
119 }
120
121 pub(crate) fn create(
122 recipient_user_id: u64,
123 initial_project: Option<ModelHandle<Project>>,
124 client: Arc<Client>,
125 user_store: ModelHandle<UserStore>,
126 cx: &mut MutableAppContext,
127 ) -> Task<Result<ModelHandle<Self>>> {
128 cx.spawn(|mut cx| async move {
129 let response = client.request(proto::CreateRoom {}).await?;
130 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
131 let room = cx.add_model(|cx| {
132 Self::new(
133 room_proto.id,
134 response.live_kit_connection_info,
135 client,
136 user_store,
137 cx,
138 )
139 });
140
141 let initial_project_id = if let Some(initial_project) = initial_project {
142 let initial_project_id = room
143 .update(&mut cx, |room, cx| {
144 room.share_project(initial_project.clone(), cx)
145 })
146 .await?;
147 Some(initial_project_id)
148 } else {
149 None
150 };
151
152 match room
153 .update(&mut cx, |room, cx| {
154 room.leave_when_empty = true;
155 room.call(recipient_user_id, initial_project_id, cx)
156 })
157 .await
158 {
159 Ok(()) => Ok(room),
160 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
161 }
162 })
163 }
164
165 pub(crate) fn join(
166 call: &IncomingCall,
167 client: Arc<Client>,
168 user_store: ModelHandle<UserStore>,
169 cx: &mut MutableAppContext,
170 ) -> Task<Result<ModelHandle<Self>>> {
171 let room_id = call.room_id;
172 cx.spawn(|mut cx| async move {
173 let response = client.request(proto::JoinRoom { id: room_id }).await?;
174 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
175 let room = cx.add_model(|cx| {
176 Self::new(
177 room_id,
178 response.live_kit_connection_info,
179 client,
180 user_store,
181 cx,
182 )
183 });
184 room.update(&mut cx, |room, cx| {
185 room.leave_when_empty = true;
186 room.apply_room_update(room_proto, cx)?;
187 anyhow::Ok(())
188 })?;
189 Ok(room)
190 })
191 }
192
193 fn should_leave(&self) -> bool {
194 self.leave_when_empty
195 && self.pending_room_update.is_none()
196 && self.pending_participants.is_empty()
197 && self.remote_participants.is_empty()
198 && self.pending_call_count == 0
199 }
200
201 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
202 if self.status.is_offline() {
203 return Err(anyhow!("room is offline"));
204 }
205
206 cx.notify();
207 cx.emit(Event::Left);
208 self.status = RoomStatus::Offline;
209 self.remote_participants.clear();
210 self.pending_participants.clear();
211 self.participant_user_ids.clear();
212 self.subscriptions.clear();
213 self.client.send(proto::LeaveRoom { id: self.id })?;
214 Ok(())
215 }
216
217 pub fn id(&self) -> u64 {
218 self.id
219 }
220
221 pub fn status(&self) -> RoomStatus {
222 self.status
223 }
224
225 pub fn local_participant(&self) -> &LocalParticipant {
226 &self.local_participant
227 }
228
229 pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
230 &self.remote_participants
231 }
232
233 pub fn pending_participants(&self) -> &[Arc<User>] {
234 &self.pending_participants
235 }
236
237 pub fn contains_participant(&self, user_id: u64) -> bool {
238 self.participant_user_ids.contains(&user_id)
239 }
240
241 async fn handle_room_updated(
242 this: ModelHandle<Self>,
243 envelope: TypedEnvelope<proto::RoomUpdated>,
244 _: Arc<Client>,
245 mut cx: AsyncAppContext,
246 ) -> Result<()> {
247 let room = envelope
248 .payload
249 .room
250 .ok_or_else(|| anyhow!("invalid room"))?;
251 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
252 }
253
254 fn apply_room_update(
255 &mut self,
256 mut room: proto::Room,
257 cx: &mut ModelContext<Self>,
258 ) -> Result<()> {
259 // Filter ourselves out from the room's participants.
260 let local_participant_ix = room
261 .participants
262 .iter()
263 .position(|participant| Some(participant.user_id) == self.client.user_id());
264 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
265
266 let remote_participant_user_ids = room
267 .participants
268 .iter()
269 .map(|p| p.user_id)
270 .collect::<Vec<_>>();
271 let (remote_participants, pending_participants) =
272 self.user_store.update(cx, move |user_store, cx| {
273 (
274 user_store.get_users(remote_participant_user_ids, cx),
275 user_store.get_users(room.pending_participant_user_ids, cx),
276 )
277 });
278 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
279 let (remote_participants, pending_participants) =
280 futures::join!(remote_participants, pending_participants);
281
282 this.update(&mut cx, |this, cx| {
283 this.participant_user_ids.clear();
284
285 if let Some(participant) = local_participant {
286 this.local_participant.projects = participant.projects;
287 } else {
288 this.local_participant.projects.clear();
289 }
290
291 if let Some(participants) = remote_participants.log_err() {
292 for (participant, user) in room.participants.into_iter().zip(participants) {
293 let peer_id = PeerId(participant.peer_id);
294 this.participant_user_ids.insert(participant.user_id);
295
296 let old_projects = this
297 .remote_participants
298 .get(&peer_id)
299 .into_iter()
300 .flat_map(|existing| &existing.projects)
301 .map(|project| project.id)
302 .collect::<HashSet<_>>();
303 let new_projects = participant
304 .projects
305 .iter()
306 .map(|project| project.id)
307 .collect::<HashSet<_>>();
308
309 for project in &participant.projects {
310 if !old_projects.contains(&project.id) {
311 cx.emit(Event::RemoteProjectShared {
312 owner: user.clone(),
313 project_id: project.id,
314 worktree_root_names: project.worktree_root_names.clone(),
315 });
316 }
317 }
318
319 for unshared_project_id in old_projects.difference(&new_projects) {
320 cx.emit(Event::RemoteProjectUnshared {
321 project_id: *unshared_project_id,
322 });
323 }
324
325 let location = ParticipantLocation::from_proto(participant.location)
326 .unwrap_or(ParticipantLocation::External);
327 if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id)
328 {
329 remote_participant.projects = participant.projects;
330 remote_participant.location = location;
331 } else {
332 this.remote_participants.insert(
333 peer_id,
334 RemoteParticipant {
335 user: user.clone(),
336 projects: participant.projects,
337 location,
338 tracks: Default::default(),
339 },
340 );
341
342 if let Some((room, _)) = this.live_kit_room.as_ref() {
343 let tracks = room.remote_video_tracks(&peer_id.0.to_string());
344 for track in tracks {
345 this.remote_video_track_updated(
346 RemoteVideoTrackUpdate::Subscribed(track),
347 cx,
348 )
349 .log_err();
350 }
351 }
352 }
353 }
354
355 this.remote_participants.retain(|_, participant| {
356 if this.participant_user_ids.contains(&participant.user.id) {
357 true
358 } else {
359 for project in &participant.projects {
360 cx.emit(Event::RemoteProjectUnshared {
361 project_id: project.id,
362 });
363 }
364 false
365 }
366 });
367 }
368
369 if let Some(pending_participants) = pending_participants.log_err() {
370 this.pending_participants = pending_participants;
371 for participant in &this.pending_participants {
372 this.participant_user_ids.insert(participant.id);
373 }
374 }
375
376 this.pending_room_update.take();
377 if this.should_leave() {
378 let _ = this.leave(cx);
379 }
380
381 this.check_invariants();
382 cx.notify();
383 });
384 }));
385
386 cx.notify();
387 Ok(())
388 }
389
390 fn remote_video_track_updated(
391 &mut self,
392 change: RemoteVideoTrackUpdate,
393 cx: &mut ModelContext<Self>,
394 ) -> Result<()> {
395 match change {
396 RemoteVideoTrackUpdate::Subscribed(track) => {
397 let peer_id = PeerId(track.publisher_id().parse()?);
398 let track_id = track.sid().to_string();
399 let participant = self
400 .remote_participants
401 .get_mut(&peer_id)
402 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
403 let (mut tx, mut rx) = watch::channel();
404 track.add_renderer(move |frame| *tx.borrow_mut() = Some(frame));
405 participant.tracks.insert(
406 track_id.clone(),
407 RemoteVideoTrack {
408 frame: None,
409 _live_kit_track: track,
410 _maintain_frame: Arc::new(cx.spawn_weak(|this, mut cx| async move {
411 while let Some(frame) = rx.next().await {
412 let this = if let Some(this) = this.upgrade(&cx) {
413 this
414 } else {
415 break;
416 };
417
418 let done = this.update(&mut cx, |this, cx| {
419 if let Some(track) =
420 this.remote_participants.get_mut(&peer_id).and_then(
421 |participant| participant.tracks.get_mut(&track_id),
422 )
423 {
424 track.frame = frame;
425 cx.emit(Event::Frame {
426 participant_id: peer_id,
427 track_id: track_id.clone(),
428 });
429 false
430 } else {
431 true
432 }
433 });
434
435 if done {
436 break;
437 }
438 }
439 })),
440 },
441 );
442 }
443 RemoteVideoTrackUpdate::Unsubscribed {
444 publisher_id,
445 track_id,
446 } => {
447 let peer_id = PeerId(publisher_id.parse()?);
448 let participant = self
449 .remote_participants
450 .get_mut(&peer_id)
451 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
452 participant.tracks.remove(&track_id);
453 }
454 }
455
456 cx.notify();
457 Ok(())
458 }
459
460 fn check_invariants(&self) {
461 #[cfg(any(test, feature = "test-support"))]
462 {
463 for participant in self.remote_participants.values() {
464 assert!(self.participant_user_ids.contains(&participant.user.id));
465 }
466
467 for participant in &self.pending_participants {
468 assert!(self.participant_user_ids.contains(&participant.id));
469 }
470
471 assert_eq!(
472 self.participant_user_ids.len(),
473 self.remote_participants.len() + self.pending_participants.len()
474 );
475 }
476 }
477
478 pub(crate) fn call(
479 &mut self,
480 recipient_user_id: u64,
481 initial_project_id: Option<u64>,
482 cx: &mut ModelContext<Self>,
483 ) -> Task<Result<()>> {
484 if self.status.is_offline() {
485 return Task::ready(Err(anyhow!("room is offline")));
486 }
487
488 cx.notify();
489 let client = self.client.clone();
490 let room_id = self.id;
491 self.pending_call_count += 1;
492 cx.spawn(|this, mut cx| async move {
493 let result = client
494 .request(proto::Call {
495 room_id,
496 recipient_user_id,
497 initial_project_id,
498 })
499 .await;
500 this.update(&mut cx, |this, cx| {
501 this.pending_call_count -= 1;
502 if this.should_leave() {
503 this.leave(cx)?;
504 }
505 result
506 })?;
507 Ok(())
508 })
509 }
510
511 pub(crate) fn share_project(
512 &mut self,
513 project: ModelHandle<Project>,
514 cx: &mut ModelContext<Self>,
515 ) -> Task<Result<u64>> {
516 if let Some(project_id) = project.read(cx).remote_id() {
517 return Task::ready(Ok(project_id));
518 }
519
520 let request = self.client.request(proto::ShareProject {
521 room_id: self.id(),
522 worktrees: project
523 .read(cx)
524 .worktrees(cx)
525 .map(|worktree| {
526 let worktree = worktree.read(cx);
527 proto::WorktreeMetadata {
528 id: worktree.id().to_proto(),
529 root_name: worktree.root_name().into(),
530 visible: worktree.is_visible(),
531 }
532 })
533 .collect(),
534 });
535 cx.spawn(|this, mut cx| async move {
536 let response = request.await?;
537
538 project.update(&mut cx, |project, cx| {
539 project
540 .shared(response.project_id, cx)
541 .detach_and_log_err(cx)
542 });
543
544 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
545 this.update(&mut cx, |this, cx| {
546 let active_project = this.local_participant.active_project.as_ref();
547 if active_project.map_or(false, |location| *location == project) {
548 this.set_location(Some(&project), cx)
549 } else {
550 Task::ready(Ok(()))
551 }
552 })
553 .await?;
554
555 Ok(response.project_id)
556 })
557 }
558
559 pub fn set_location(
560 &mut self,
561 project: Option<&ModelHandle<Project>>,
562 cx: &mut ModelContext<Self>,
563 ) -> Task<Result<()>> {
564 if self.status.is_offline() {
565 return Task::ready(Err(anyhow!("room is offline")));
566 }
567
568 let client = self.client.clone();
569 let room_id = self.id;
570 let location = if let Some(project) = project {
571 self.local_participant.active_project = Some(project.downgrade());
572 if let Some(project_id) = project.read(cx).remote_id() {
573 proto::participant_location::Variant::SharedProject(
574 proto::participant_location::SharedProject { id: project_id },
575 )
576 } else {
577 proto::participant_location::Variant::UnsharedProject(
578 proto::participant_location::UnsharedProject {},
579 )
580 }
581 } else {
582 self.local_participant.active_project = None;
583 proto::participant_location::Variant::External(proto::participant_location::External {})
584 };
585
586 cx.notify();
587 cx.foreground().spawn(async move {
588 client
589 .request(proto::UpdateParticipantLocation {
590 room_id,
591 location: Some(proto::ParticipantLocation {
592 variant: Some(location),
593 }),
594 })
595 .await?;
596 Ok(())
597 })
598 }
599
600 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
601 if self.status.is_offline() {
602 return Task::ready(Err(anyhow!("room is offline")));
603 }
604
605 let room = if let Some((room, _)) = self.live_kit_room.as_ref() {
606 room.clone()
607 } else {
608 return Task::ready(Err(anyhow!("not connected to LiveKit")));
609 };
610
611 cx.foreground().spawn(async move {
612 let displays = live_kit_client::display_sources().await?;
613 let display = displays
614 .first()
615 .ok_or_else(|| anyhow!("no display found"))?;
616 let track = LocalVideoTrack::screen_share_for_display(&display);
617 room.publish_video_track(&track).await?;
618 Ok(())
619 })
620 }
621}
622
623#[derive(Copy, Clone, PartialEq, Eq)]
624pub enum RoomStatus {
625 Online,
626 Offline,
627}
628
629impl RoomStatus {
630 pub fn is_offline(&self) -> bool {
631 matches!(self, RoomStatus::Offline)
632 }
633}