1use crate::{
2 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
3 IncomingCall,
4};
5use anyhow::{anyhow, Result};
6use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
7use collections::{BTreeMap, HashSet};
8use futures::StreamExt;
9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
10use project::Project;
11use std::sync::Arc;
12use util::ResultExt;
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub enum Event {
16 RemoteProjectShared {
17 owner: Arc<User>,
18 project_id: u64,
19 worktree_root_names: Vec<String>,
20 },
21 RemoteProjectUnshared {
22 project_id: u64,
23 },
24 Left,
25}
26
27pub struct Room {
28 id: u64,
29 status: RoomStatus,
30 local_participant: LocalParticipant,
31 remote_participants: BTreeMap<PeerId, RemoteParticipant>,
32 pending_participants: Vec<Arc<User>>,
33 participant_user_ids: HashSet<u64>,
34 pending_call_count: usize,
35 leave_when_empty: bool,
36 client: Arc<Client>,
37 user_store: ModelHandle<UserStore>,
38 subscriptions: Vec<client::Subscription>,
39 pending_room_update: Option<Task<()>>,
40}
41
42impl Entity for Room {
43 type Event = Event;
44
45 fn release(&mut self, _: &mut MutableAppContext) {
46 self.client.send(proto::LeaveRoom { id: self.id }).log_err();
47 }
48}
49
50impl Room {
51 fn new(
52 id: u64,
53 client: Arc<Client>,
54 user_store: ModelHandle<UserStore>,
55 cx: &mut ModelContext<Self>,
56 ) -> Self {
57 let mut client_status = client.status();
58 cx.spawn_weak(|this, mut cx| async move {
59 let is_connected = client_status
60 .next()
61 .await
62 .map_or(false, |s| s.is_connected());
63 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
64 if !is_connected || client_status.next().await.is_some() {
65 if let Some(this) = this.upgrade(&cx) {
66 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
67 }
68 }
69 })
70 .detach();
71
72 Self {
73 id,
74 status: RoomStatus::Online,
75 participant_user_ids: Default::default(),
76 local_participant: Default::default(),
77 remote_participants: Default::default(),
78 pending_participants: Default::default(),
79 pending_call_count: 0,
80 subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
81 leave_when_empty: false,
82 pending_room_update: None,
83 client,
84 user_store,
85 }
86 }
87
88 pub(crate) fn create(
89 recipient_user_id: u64,
90 initial_project: Option<ModelHandle<Project>>,
91 client: Arc<Client>,
92 user_store: ModelHandle<UserStore>,
93 cx: &mut MutableAppContext,
94 ) -> Task<Result<ModelHandle<Self>>> {
95 cx.spawn(|mut cx| async move {
96 let response = client.request(proto::CreateRoom {}).await?;
97 let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
98
99 let initial_project_id = if let Some(initial_project) = initial_project {
100 let initial_project_id = room
101 .update(&mut cx, |room, cx| {
102 room.share_project(initial_project.clone(), cx)
103 })
104 .await?;
105 Some(initial_project_id)
106 } else {
107 None
108 };
109
110 match room
111 .update(&mut cx, |room, cx| {
112 room.leave_when_empty = true;
113 room.call(recipient_user_id, initial_project_id, cx)
114 })
115 .await
116 {
117 Ok(()) => Ok(room),
118 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
119 }
120 })
121 }
122
123 pub(crate) fn join(
124 call: &IncomingCall,
125 client: Arc<Client>,
126 user_store: ModelHandle<UserStore>,
127 cx: &mut MutableAppContext,
128 ) -> Task<Result<ModelHandle<Self>>> {
129 let room_id = call.room_id;
130 cx.spawn(|mut cx| async move {
131 let response = client.request(proto::JoinRoom { id: room_id }).await?;
132 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
133 let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
134 room.update(&mut cx, |room, cx| {
135 room.leave_when_empty = true;
136 room.apply_room_update(room_proto, cx)?;
137 anyhow::Ok(())
138 })?;
139 Ok(room)
140 })
141 }
142
143 fn should_leave(&self) -> bool {
144 self.leave_when_empty
145 && self.pending_room_update.is_none()
146 && self.pending_participants.is_empty()
147 && self.remote_participants.is_empty()
148 && self.pending_call_count == 0
149 }
150
151 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
152 if self.status.is_offline() {
153 return Err(anyhow!("room is offline"));
154 }
155
156 cx.notify();
157 cx.emit(Event::Left);
158 self.status = RoomStatus::Offline;
159 self.remote_participants.clear();
160 self.pending_participants.clear();
161 self.participant_user_ids.clear();
162 self.subscriptions.clear();
163 self.client.send(proto::LeaveRoom { id: self.id })?;
164 Ok(())
165 }
166
167 pub fn id(&self) -> u64 {
168 self.id
169 }
170
171 pub fn status(&self) -> RoomStatus {
172 self.status
173 }
174
175 pub fn local_participant(&self) -> &LocalParticipant {
176 &self.local_participant
177 }
178
179 pub fn remote_participants(&self) -> &BTreeMap<PeerId, RemoteParticipant> {
180 &self.remote_participants
181 }
182
183 pub fn pending_participants(&self) -> &[Arc<User>] {
184 &self.pending_participants
185 }
186
187 pub fn contains_participant(&self, user_id: u64) -> bool {
188 self.participant_user_ids.contains(&user_id)
189 }
190
191 async fn handle_room_updated(
192 this: ModelHandle<Self>,
193 envelope: TypedEnvelope<proto::RoomUpdated>,
194 _: Arc<Client>,
195 mut cx: AsyncAppContext,
196 ) -> Result<()> {
197 let room = envelope
198 .payload
199 .room
200 .ok_or_else(|| anyhow!("invalid room"))?;
201 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
202 }
203
204 fn apply_room_update(
205 &mut self,
206 mut room: proto::Room,
207 cx: &mut ModelContext<Self>,
208 ) -> Result<()> {
209 // Filter ourselves out from the room's participants.
210 let local_participant_ix = room
211 .participants
212 .iter()
213 .position(|participant| Some(participant.user_id) == self.client.user_id());
214 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
215
216 let remote_participant_user_ids = room
217 .participants
218 .iter()
219 .map(|p| p.user_id)
220 .collect::<Vec<_>>();
221 let (remote_participants, pending_participants) =
222 self.user_store.update(cx, move |user_store, cx| {
223 (
224 user_store.get_users(remote_participant_user_ids, cx),
225 user_store.get_users(room.pending_participant_user_ids, cx),
226 )
227 });
228 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
229 let (remote_participants, pending_participants) =
230 futures::join!(remote_participants, pending_participants);
231
232 this.update(&mut cx, |this, cx| {
233 this.participant_user_ids.clear();
234
235 if let Some(participant) = local_participant {
236 this.local_participant.projects = participant.projects;
237 } else {
238 this.local_participant.projects.clear();
239 }
240
241 if let Some(participants) = remote_participants.log_err() {
242 for (participant, user) in room.participants.into_iter().zip(participants) {
243 let peer_id = PeerId(participant.peer_id);
244 this.participant_user_ids.insert(participant.user_id);
245
246 let old_projects = this
247 .remote_participants
248 .get(&peer_id)
249 .into_iter()
250 .flat_map(|existing| &existing.projects)
251 .map(|project| project.id)
252 .collect::<HashSet<_>>();
253 let new_projects = participant
254 .projects
255 .iter()
256 .map(|project| project.id)
257 .collect::<HashSet<_>>();
258
259 for project in &participant.projects {
260 if !old_projects.contains(&project.id) {
261 cx.emit(Event::RemoteProjectShared {
262 owner: user.clone(),
263 project_id: project.id,
264 worktree_root_names: project.worktree_root_names.clone(),
265 });
266 }
267 }
268
269 for unshared_project_id in old_projects.difference(&new_projects) {
270 cx.emit(Event::RemoteProjectUnshared {
271 project_id: *unshared_project_id,
272 });
273 }
274
275 this.remote_participants.insert(
276 peer_id,
277 RemoteParticipant {
278 user: user.clone(),
279 projects: participant.projects,
280 location: ParticipantLocation::from_proto(participant.location)
281 .unwrap_or(ParticipantLocation::External),
282 },
283 );
284 }
285
286 this.remote_participants.retain(|_, participant| {
287 if this.participant_user_ids.contains(&participant.user.id) {
288 true
289 } else {
290 for project in &participant.projects {
291 cx.emit(Event::RemoteProjectUnshared {
292 project_id: project.id,
293 });
294 }
295 false
296 }
297 });
298 }
299
300 if let Some(pending_participants) = pending_participants.log_err() {
301 this.pending_participants = pending_participants;
302 for participant in &this.pending_participants {
303 this.participant_user_ids.insert(participant.id);
304 }
305 }
306
307 this.pending_room_update.take();
308 if this.should_leave() {
309 let _ = this.leave(cx);
310 }
311
312 this.check_invariants();
313 cx.notify();
314 });
315 }));
316
317 cx.notify();
318 Ok(())
319 }
320
321 fn check_invariants(&self) {
322 #[cfg(any(test, feature = "test-support"))]
323 {
324 for participant in self.remote_participants.values() {
325 assert!(self.participant_user_ids.contains(&participant.user.id));
326 }
327
328 for participant in &self.pending_participants {
329 assert!(self.participant_user_ids.contains(&participant.id));
330 }
331
332 assert_eq!(
333 self.participant_user_ids.len(),
334 self.remote_participants.len() + self.pending_participants.len()
335 );
336 }
337 }
338
339 pub(crate) fn call(
340 &mut self,
341 recipient_user_id: u64,
342 initial_project_id: Option<u64>,
343 cx: &mut ModelContext<Self>,
344 ) -> Task<Result<()>> {
345 if self.status.is_offline() {
346 return Task::ready(Err(anyhow!("room is offline")));
347 }
348
349 cx.notify();
350 let client = self.client.clone();
351 let room_id = self.id;
352 self.pending_call_count += 1;
353 cx.spawn(|this, mut cx| async move {
354 let result = client
355 .request(proto::Call {
356 room_id,
357 recipient_user_id,
358 initial_project_id,
359 })
360 .await;
361 this.update(&mut cx, |this, cx| {
362 this.pending_call_count -= 1;
363 if this.should_leave() {
364 this.leave(cx)?;
365 }
366 result
367 })?;
368 Ok(())
369 })
370 }
371
372 pub(crate) fn share_project(
373 &mut self,
374 project: ModelHandle<Project>,
375 cx: &mut ModelContext<Self>,
376 ) -> Task<Result<u64>> {
377 if project.read(cx).is_remote() {
378 return Task::ready(Err(anyhow!("can't share remote project")));
379 } else if let Some(project_id) = project.read(cx).remote_id() {
380 return Task::ready(Ok(project_id));
381 }
382
383 let request = self.client.request(proto::ShareProject {
384 room_id: self.id(),
385 worktrees: project
386 .read(cx)
387 .worktrees(cx)
388 .map(|worktree| {
389 let worktree = worktree.read(cx);
390 proto::WorktreeMetadata {
391 id: worktree.id().to_proto(),
392 root_name: worktree.root_name().into(),
393 visible: worktree.is_visible(),
394 }
395 })
396 .collect(),
397 });
398 cx.spawn(|this, mut cx| async move {
399 let response = request.await?;
400
401 project.update(&mut cx, |project, cx| {
402 project
403 .shared(response.project_id, cx)
404 .detach_and_log_err(cx)
405 });
406
407 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
408 this.update(&mut cx, |this, cx| {
409 let active_project = this.local_participant.active_project.as_ref();
410 if active_project.map_or(false, |location| *location == project) {
411 this.set_location(Some(&project), cx)
412 } else {
413 Task::ready(Ok(()))
414 }
415 })
416 .await?;
417
418 Ok(response.project_id)
419 })
420 }
421
422 pub fn set_location(
423 &mut self,
424 project: Option<&ModelHandle<Project>>,
425 cx: &mut ModelContext<Self>,
426 ) -> Task<Result<()>> {
427 if self.status.is_offline() {
428 return Task::ready(Err(anyhow!("room is offline")));
429 }
430
431 let client = self.client.clone();
432 let room_id = self.id;
433 let location = if let Some(project) = project {
434 self.local_participant.active_project = Some(project.downgrade());
435 if let Some(project_id) = project.read(cx).remote_id() {
436 proto::participant_location::Variant::SharedProject(
437 proto::participant_location::SharedProject { id: project_id },
438 )
439 } else {
440 proto::participant_location::Variant::UnsharedProject(
441 proto::participant_location::UnsharedProject {},
442 )
443 }
444 } else {
445 self.local_participant.active_project = None;
446 proto::participant_location::Variant::External(proto::participant_location::External {})
447 };
448
449 cx.notify();
450 cx.foreground().spawn(async move {
451 client
452 .request(proto::UpdateParticipantLocation {
453 room_id,
454 location: Some(proto::ParticipantLocation {
455 variant: Some(location),
456 }),
457 })
458 .await?;
459 Ok(())
460 })
461 }
462}
463
464#[derive(Copy, Clone, PartialEq, Eq)]
465pub enum RoomStatus {
466 Online,
467 Offline,
468}
469
470impl RoomStatus {
471 pub fn is_offline(&self) -> bool {
472 matches!(self, RoomStatus::Offline)
473 }
474}