1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use futures::{future, AsyncReadExt, Future};
4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
5use postage::{prelude::Stream, sink::Sink, watch};
6use rpc::proto::{RequestMessage, UsersResponse};
7use std::{
8 collections::{hash_map::Entry, HashMap, HashSet},
9 sync::{Arc, Weak},
10};
11use util::TryFutureExt as _;
12
13#[derive(Debug)]
14pub struct User {
15 pub id: u64,
16 pub github_login: String,
17 pub avatar: Option<Arc<ImageData>>,
18}
19
20#[derive(Debug)]
21pub struct Contact {
22 pub user: Arc<User>,
23 pub projects: Vec<ProjectMetadata>,
24}
25
26#[derive(Debug)]
27pub struct ProjectMetadata {
28 pub id: u64,
29 pub is_shared: bool,
30 pub worktree_root_names: Vec<String>,
31 pub guests: Vec<Arc<User>>,
32}
33
34#[derive(Debug, Clone, Copy)]
35pub enum ContactRequestStatus {
36 None,
37 SendingRequest,
38 Requested,
39 RequestAccepted,
40}
41
42pub struct UserStore {
43 users: HashMap<u64, Arc<User>>,
44 update_contacts_tx: watch::Sender<Option<proto::UpdateContacts>>,
45 current_user: watch::Receiver<Option<Arc<User>>>,
46 contacts: Vec<Arc<Contact>>,
47 incoming_contact_requests: Vec<Arc<User>>,
48 outgoing_contact_requests: Vec<Arc<User>>,
49 pending_contact_requests: HashMap<u64, usize>,
50 client: Weak<Client>,
51 http: Arc<dyn HttpClient>,
52 _maintain_contacts: Task<()>,
53 _maintain_current_user: Task<()>,
54}
55
56pub enum Event {}
57
58impl Entity for UserStore {
59 type Event = Event;
60}
61
62impl UserStore {
63 pub fn new(
64 client: Arc<Client>,
65 http: Arc<dyn HttpClient>,
66 cx: &mut ModelContext<Self>,
67 ) -> Self {
68 let (mut current_user_tx, current_user_rx) = watch::channel();
69 let (update_contacts_tx, mut update_contacts_rx) =
70 watch::channel::<Option<proto::UpdateContacts>>();
71 let rpc_subscription =
72 client.add_message_handler(cx.handle(), Self::handle_update_contacts);
73 Self {
74 users: Default::default(),
75 current_user: current_user_rx,
76 contacts: Default::default(),
77 incoming_contact_requests: Default::default(),
78 outgoing_contact_requests: Default::default(),
79 client: Arc::downgrade(&client),
80 update_contacts_tx,
81 http,
82 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
83 let _subscription = rpc_subscription;
84 while let Some(message) = update_contacts_rx.recv().await {
85 if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
86 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
87 .log_err()
88 .await;
89 }
90 }
91 }),
92 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
93 let mut status = client.status();
94 while let Some(status) = status.recv().await {
95 match status {
96 Status::Connected { .. } => {
97 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
98 let user = this
99 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
100 .log_err()
101 .await;
102 current_user_tx.send(user).await.ok();
103 }
104 }
105 Status::SignedOut => {
106 current_user_tx.send(None).await.ok();
107 }
108 _ => {}
109 }
110 }
111 }),
112 pending_contact_requests: Default::default(),
113 }
114 }
115
116 async fn handle_update_contacts(
117 this: ModelHandle<Self>,
118 msg: TypedEnvelope<proto::UpdateContacts>,
119 _: Arc<Client>,
120 mut cx: AsyncAppContext,
121 ) -> Result<()> {
122 this.update(&mut cx, |this, _| {
123 *this.update_contacts_tx.borrow_mut() = Some(msg.payload);
124 });
125 Ok(())
126 }
127
128 fn update_contacts(
129 &mut self,
130 message: proto::UpdateContacts,
131 cx: &mut ModelContext<Self>,
132 ) -> Task<Result<()>> {
133 log::info!("update contacts on client {:?}", message);
134 let mut user_ids = HashSet::new();
135 for contact in &message.contacts {
136 user_ids.insert(contact.user_id);
137 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
138 }
139 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
140 user_ids.extend(message.outgoing_requests.iter());
141
142 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
143 cx.spawn(|this, mut cx| async move {
144 load_users.await?;
145
146 // Users are fetched in parallel above and cached in call to get_users
147 // No need to paralellize here
148 let mut updated_contacts = Vec::new();
149 for contact in message.contacts {
150 updated_contacts.push(Arc::new(
151 Contact::from_proto(contact, &this, &mut cx).await?,
152 ));
153 }
154
155 let mut incoming_requests = Vec::new();
156 for request in message.incoming_requests {
157 incoming_requests.push(
158 this.update(&mut cx, |this, cx| {
159 this.fetch_user(request.requester_id, cx)
160 })
161 .await?,
162 );
163 }
164
165 let mut outgoing_requests = Vec::new();
166 for requested_user_id in message.outgoing_requests {
167 outgoing_requests.push(
168 this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
169 .await?,
170 );
171 }
172
173 let removed_contacts =
174 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
175 let removed_incoming_requests =
176 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
177 let removed_outgoing_requests =
178 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
179
180 this.update(&mut cx, |this, cx| {
181 // Remove contacts
182 this.contacts
183 .retain(|contact| !removed_contacts.contains(&contact.user.id));
184 // Update existing contacts and insert new ones
185 for updated_contact in updated_contacts {
186 match this
187 .contacts
188 .binary_search_by_key(&&updated_contact.user.github_login, |contact| {
189 &contact.user.github_login
190 }) {
191 Ok(ix) => this.contacts[ix] = updated_contact,
192 Err(ix) => this.contacts.insert(ix, updated_contact),
193 }
194 }
195 cx.notify();
196
197 // Remove incoming contact requests
198 this.incoming_contact_requests
199 .retain(|user| !removed_incoming_requests.contains(&user.id));
200 // Update existing incoming requests and insert new ones
201 for request in incoming_requests {
202 match this
203 .incoming_contact_requests
204 .binary_search_by_key(&&request.github_login, |contact| {
205 &contact.github_login
206 }) {
207 Ok(ix) => this.incoming_contact_requests[ix] = request,
208 Err(ix) => this.incoming_contact_requests.insert(ix, request),
209 }
210 }
211
212 // Remove outgoing contact requests
213 this.outgoing_contact_requests
214 .retain(|user| !removed_outgoing_requests.contains(&user.id));
215 // Update existing incoming requests and insert new ones
216 for request in outgoing_requests {
217 match this
218 .outgoing_contact_requests
219 .binary_search_by_key(&&request.github_login, |contact| {
220 &contact.github_login
221 }) {
222 Ok(ix) => this.outgoing_contact_requests[ix] = request,
223 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
224 }
225 }
226 });
227
228 Ok(())
229 })
230 }
231
232 pub fn contacts(&self) -> &[Arc<Contact>] {
233 &self.contacts
234 }
235
236 pub fn has_contact(&self, user: &Arc<User>) -> bool {
237 self.contacts
238 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
239 .is_ok()
240 }
241
242 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
243 &self.incoming_contact_requests
244 }
245
246 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
247 &self.outgoing_contact_requests
248 }
249
250 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
251 if self
252 .contacts
253 .binary_search_by_key(&&user.id, |contact| &contact.user.id)
254 .is_ok()
255 {
256 ContactRequestStatus::RequestAccepted
257 } else if self
258 .outgoing_contact_requests
259 .binary_search_by_key(&&user.id, |user| &user.id)
260 .is_ok()
261 {
262 ContactRequestStatus::Requested
263 } else if self.pending_contact_requests.contains_key(&user.id) {
264 ContactRequestStatus::SendingRequest
265 } else {
266 ContactRequestStatus::None
267 }
268 }
269
270 pub fn request_contact(
271 &mut self,
272 responder_id: u64,
273 cx: &mut ModelContext<Self>,
274 ) -> Task<Result<()>> {
275 let client = self.client.upgrade();
276 *self
277 .pending_contact_requests
278 .entry(responder_id)
279 .or_insert(0) += 1;
280 cx.notify();
281
282 cx.spawn(|this, mut cx| async move {
283 let request = client
284 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
285 .request(proto::RequestContact { responder_id });
286 request.await?;
287 this.update(&mut cx, |this, cx| {
288 if let Entry::Occupied(mut request_count) =
289 this.pending_contact_requests.entry(responder_id)
290 {
291 *request_count.get_mut() -= 1;
292 if *request_count.get() == 0 {
293 request_count.remove();
294 }
295 }
296 cx.notify();
297 });
298 Ok(())
299 })
300 }
301
302 pub fn remove_contact(
303 &mut self,
304 user_id: u64,
305 cx: &mut ModelContext<Self>,
306 ) -> Task<Result<()>> {
307 let client = self.client.upgrade();
308 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
309 cx.notify();
310
311 cx.spawn(|this, mut cx| async move {
312 let request = client
313 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
314 .request(proto::RemoveContact { user_id });
315 request.await?;
316 this.update(&mut cx, |this, cx| {
317 if let Entry::Occupied(mut request_count) =
318 this.pending_contact_requests.entry(user_id)
319 {
320 *request_count.get_mut() -= 1;
321 if *request_count.get() == 0 {
322 request_count.remove();
323 }
324 }
325 cx.notify();
326 });
327 Ok(())
328 })
329 }
330
331 pub fn respond_to_contact_request(
332 &self,
333 requester_id: u64,
334 accept: bool,
335 ) -> impl Future<Output = Result<()>> {
336 let client = self.client.upgrade();
337 async move {
338 client
339 .ok_or_else(|| anyhow!("not logged in"))?
340 .request(proto::RespondToContactRequest {
341 requester_id,
342 response: if accept {
343 proto::ContactRequestResponse::Accept
344 } else {
345 proto::ContactRequestResponse::Reject
346 } as i32,
347 })
348 .await?;
349 Ok(())
350 }
351 }
352
353 #[cfg(any(test, feature = "test-support"))]
354 pub fn clear_contacts(&mut self) {
355 self.contacts.clear();
356 self.incoming_contact_requests.clear();
357 self.outgoing_contact_requests.clear();
358 }
359
360 pub fn get_users(
361 &mut self,
362 mut user_ids: Vec<u64>,
363 cx: &mut ModelContext<Self>,
364 ) -> Task<Result<Vec<Arc<User>>>> {
365 user_ids.retain(|id| !self.users.contains_key(id));
366 self.load_users(proto::GetUsers { user_ids }, cx)
367 }
368
369 pub fn fuzzy_search_users(
370 &mut self,
371 query: String,
372 cx: &mut ModelContext<Self>,
373 ) -> Task<Result<Vec<Arc<User>>>> {
374 self.load_users(proto::FuzzySearchUsers { query }, cx)
375 }
376
377 pub fn fetch_user(
378 &mut self,
379 user_id: u64,
380 cx: &mut ModelContext<Self>,
381 ) -> Task<Result<Arc<User>>> {
382 if let Some(user) = self.users.get(&user_id).cloned() {
383 return cx.foreground().spawn(async move { Ok(user) });
384 }
385
386 let load_users = self.get_users(vec![user_id], cx);
387 cx.spawn(|this, mut cx| async move {
388 load_users.await?;
389 this.update(&mut cx, |this, _| {
390 this.users
391 .get(&user_id)
392 .cloned()
393 .ok_or_else(|| anyhow!("server responded with no users"))
394 })
395 })
396 }
397
398 pub fn current_user(&self) -> Option<Arc<User>> {
399 self.current_user.borrow().clone()
400 }
401
402 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
403 self.current_user.clone()
404 }
405
406 fn load_users(
407 &mut self,
408 request: impl RequestMessage<Response = UsersResponse>,
409 cx: &mut ModelContext<Self>,
410 ) -> Task<Result<Vec<Arc<User>>>> {
411 let client = self.client.clone();
412 let http = self.http.clone();
413 cx.spawn_weak(|this, mut cx| async move {
414 if let Some(rpc) = client.upgrade() {
415 let response = rpc.request(request).await.context("error loading users")?;
416 let users = future::join_all(
417 response
418 .users
419 .into_iter()
420 .map(|user| User::new(user, http.as_ref())),
421 )
422 .await;
423
424 if let Some(this) = this.upgrade(&cx) {
425 this.update(&mut cx, |this, _| {
426 for user in &users {
427 this.users.insert(user.id, user.clone());
428 }
429 });
430 }
431 Ok(users)
432 } else {
433 Ok(Vec::new())
434 }
435 })
436 }
437}
438
439impl User {
440 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
441 Arc::new(User {
442 id: message.id,
443 github_login: message.github_login,
444 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
445 })
446 }
447}
448
449impl Contact {
450 async fn from_proto(
451 contact: proto::Contact,
452 user_store: &ModelHandle<UserStore>,
453 cx: &mut AsyncAppContext,
454 ) -> Result<Self> {
455 let user = user_store
456 .update(cx, |user_store, cx| {
457 user_store.fetch_user(contact.user_id, cx)
458 })
459 .await?;
460 let mut projects = Vec::new();
461 for project in contact.projects {
462 let mut guests = Vec::new();
463 for participant_id in project.guests {
464 guests.push(
465 user_store
466 .update(cx, |user_store, cx| {
467 user_store.fetch_user(participant_id, cx)
468 })
469 .await?,
470 );
471 }
472 projects.push(ProjectMetadata {
473 id: project.id,
474 worktree_root_names: project.worktree_root_names.clone(),
475 is_shared: project.is_shared,
476 guests,
477 });
478 }
479 Ok(Self { user, projects })
480 }
481}
482
483async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
484 let mut response = http
485 .get(url, Default::default(), true)
486 .await
487 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
488
489 if !response.status().is_success() {
490 return Err(anyhow!("avatar request failed {:?}", response.status()));
491 }
492
493 let mut body = Vec::new();
494 response
495 .body_mut()
496 .read_to_end(&mut body)
497 .await
498 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
499 let format = image::guess_format(&body)?;
500 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
501 Ok(ImageData::new(image))
502}