1use crate::participant::{ParticipantLocation, RemoteParticipant};
2use anyhow::{anyhow, Result};
3use client::{incoming_call::IncomingCall, proto, Client, PeerId, TypedEnvelope, User, UserStore};
4use collections::{HashMap, HashSet};
5use futures::StreamExt;
6use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
7use project::Project;
8use std::sync::Arc;
9use util::ResultExt;
10
11#[derive(Clone, Debug, PartialEq, Eq)]
12pub enum Event {
13 RemoteProjectShared { owner: Arc<User>, project_id: u64 },
14}
15
16pub struct Room {
17 id: u64,
18 status: RoomStatus,
19 remote_participants: HashMap<PeerId, RemoteParticipant>,
20 pending_users: Vec<Arc<User>>,
21 client: Arc<Client>,
22 user_store: ModelHandle<UserStore>,
23 _subscriptions: Vec<client::Subscription>,
24 _pending_room_update: Option<Task<()>>,
25}
26
27impl Entity for Room {
28 type Event = Event;
29
30 fn release(&mut self, _: &mut MutableAppContext) {
31 self.client.send(proto::LeaveRoom { id: self.id }).log_err();
32 }
33}
34
35impl Room {
36 fn new(
37 id: u64,
38 client: Arc<Client>,
39 user_store: ModelHandle<UserStore>,
40 cx: &mut ModelContext<Self>,
41 ) -> Self {
42 let mut client_status = client.status();
43 cx.spawn_weak(|this, mut cx| async move {
44 let is_connected = client_status
45 .next()
46 .await
47 .map_or(false, |s| s.is_connected());
48 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
49 if !is_connected || client_status.next().await.is_some() {
50 if let Some(this) = this.upgrade(&cx) {
51 let _ = this.update(&mut cx, |this, cx| this.leave(cx));
52 }
53 }
54 })
55 .detach();
56
57 Self {
58 id,
59 status: RoomStatus::Online,
60 remote_participants: Default::default(),
61 pending_users: Default::default(),
62 _subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
63 _pending_room_update: None,
64 client,
65 user_store,
66 }
67 }
68
69 pub fn create(
70 client: Arc<Client>,
71 user_store: ModelHandle<UserStore>,
72 cx: &mut MutableAppContext,
73 ) -> Task<Result<ModelHandle<Self>>> {
74 cx.spawn(|mut cx| async move {
75 let room = client.request(proto::CreateRoom {}).await?;
76 Ok(cx.add_model(|cx| Self::new(room.id, client, user_store, cx)))
77 })
78 }
79
80 pub fn join(
81 call: &IncomingCall,
82 client: Arc<Client>,
83 user_store: ModelHandle<UserStore>,
84 cx: &mut MutableAppContext,
85 ) -> Task<Result<ModelHandle<Self>>> {
86 let room_id = call.room_id;
87 cx.spawn(|mut cx| async move {
88 let response = client.request(proto::JoinRoom { id: room_id }).await?;
89 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
90 let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
91 room.update(&mut cx, |room, cx| room.apply_room_update(room_proto, cx))?;
92 Ok(room)
93 })
94 }
95
96 pub fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
97 if self.status.is_offline() {
98 return Err(anyhow!("room is offline"));
99 }
100
101 cx.notify();
102 self.status = RoomStatus::Offline;
103 self.remote_participants.clear();
104 self.client.send(proto::LeaveRoom { id: self.id })?;
105 Ok(())
106 }
107
108 pub fn id(&self) -> u64 {
109 self.id
110 }
111
112 pub fn status(&self) -> RoomStatus {
113 self.status
114 }
115
116 pub fn remote_participants(&self) -> &HashMap<PeerId, RemoteParticipant> {
117 &self.remote_participants
118 }
119
120 pub fn pending_users(&self) -> &[Arc<User>] {
121 &self.pending_users
122 }
123
124 async fn handle_room_updated(
125 this: ModelHandle<Self>,
126 envelope: TypedEnvelope<proto::RoomUpdated>,
127 _: Arc<Client>,
128 mut cx: AsyncAppContext,
129 ) -> Result<()> {
130 let room = envelope
131 .payload
132 .room
133 .ok_or_else(|| anyhow!("invalid room"))?;
134 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?;
135 Ok(())
136 }
137
138 fn apply_room_update(
139 &mut self,
140 mut room: proto::Room,
141 cx: &mut ModelContext<Self>,
142 ) -> Result<()> {
143 // Filter ourselves out from the room's participants.
144 room.participants
145 .retain(|participant| Some(participant.user_id) != self.client.user_id());
146
147 let participant_user_ids = room
148 .participants
149 .iter()
150 .map(|p| p.user_id)
151 .collect::<Vec<_>>();
152 let (participants, pending_users) = self.user_store.update(cx, move |user_store, cx| {
153 (
154 user_store.get_users(participant_user_ids, cx),
155 user_store.get_users(room.pending_user_ids, cx),
156 )
157 });
158 self._pending_room_update = Some(cx.spawn(|this, mut cx| async move {
159 let (participants, pending_users) = futures::join!(participants, pending_users);
160
161 this.update(&mut cx, |this, cx| {
162 if let Some(participants) = participants.log_err() {
163 let mut seen_participants = HashSet::default();
164
165 for (participant, user) in room.participants.into_iter().zip(participants) {
166 let peer_id = PeerId(participant.peer_id);
167 seen_participants.insert(peer_id);
168
169 let existing_project_ids = this
170 .remote_participants
171 .get(&peer_id)
172 .map(|existing| existing.project_ids.clone())
173 .unwrap_or_default();
174 for project_id in &participant.project_ids {
175 if !existing_project_ids.contains(project_id) {
176 cx.emit(Event::RemoteProjectShared {
177 owner: user.clone(),
178 project_id: *project_id,
179 });
180 }
181 }
182
183 this.remote_participants.insert(
184 peer_id,
185 RemoteParticipant {
186 user: user.clone(),
187 project_ids: participant.project_ids,
188 location: ParticipantLocation::from_proto(participant.location)
189 .unwrap_or(ParticipantLocation::External),
190 },
191 );
192 }
193
194 for participant_peer_id in
195 this.remote_participants.keys().copied().collect::<Vec<_>>()
196 {
197 if !seen_participants.contains(&participant_peer_id) {
198 this.remote_participants.remove(&participant_peer_id);
199 }
200 }
201
202 cx.notify();
203 }
204
205 if let Some(pending_users) = pending_users.log_err() {
206 this.pending_users = pending_users;
207 cx.notify();
208 }
209 });
210 }));
211
212 cx.notify();
213 Ok(())
214 }
215
216 pub fn call(
217 &mut self,
218 recipient_user_id: u64,
219 initial_project_id: Option<u64>,
220 cx: &mut ModelContext<Self>,
221 ) -> Task<Result<()>> {
222 if self.status.is_offline() {
223 return Task::ready(Err(anyhow!("room is offline")));
224 }
225
226 let client = self.client.clone();
227 let room_id = self.id;
228 cx.foreground().spawn(async move {
229 client
230 .request(proto::Call {
231 room_id,
232 recipient_user_id,
233 initial_project_id,
234 })
235 .await?;
236 Ok(())
237 })
238 }
239
240 pub fn set_location(
241 &mut self,
242 project: Option<&ModelHandle<Project>>,
243 cx: &mut ModelContext<Self>,
244 ) -> Task<Result<()>> {
245 if self.status.is_offline() {
246 return Task::ready(Err(anyhow!("room is offline")));
247 }
248
249 let client = self.client.clone();
250 let room_id = self.id;
251 let location = if let Some(project) = project {
252 if let Some(project_id) = project.read(cx).remote_id() {
253 proto::participant_location::Variant::Project(
254 proto::participant_location::Project { id: project_id },
255 )
256 } else {
257 return Task::ready(Err(anyhow!("project is not shared")));
258 }
259 } else {
260 proto::participant_location::Variant::External(proto::participant_location::External {})
261 };
262
263 cx.foreground().spawn(async move {
264 client
265 .request(proto::UpdateParticipantLocation {
266 room_id,
267 location: Some(proto::ParticipantLocation {
268 variant: Some(location),
269 }),
270 })
271 .await?;
272 Ok(())
273 })
274 }
275}
276
277#[derive(Copy, Clone, PartialEq, Eq)]
278pub enum RoomStatus {
279 Online,
280 Offline,
281}
282
283impl RoomStatus {
284 pub fn is_offline(&self) -> bool {
285 matches!(self, RoomStatus::Offline)
286 }
287}