1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use settings::Settings;
9use std::sync::{Arc, Weak};
10use util::{StaffMode, TryFutureExt as _};
11
12#[derive(Default, Debug)]
13pub struct User {
14 pub id: u64,
15 pub github_login: String,
16 pub avatar: Option<Arc<ImageData>>,
17}
18
19impl PartialOrd for User {
20 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
21 Some(self.cmp(other))
22 }
23}
24
25impl Ord for User {
26 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
27 self.github_login.cmp(&other.github_login)
28 }
29}
30
31impl PartialEq for User {
32 fn eq(&self, other: &Self) -> bool {
33 self.id == other.id && self.github_login == other.github_login
34 }
35}
36
37impl Eq for User {}
38
39#[derive(Debug, PartialEq)]
40pub struct Contact {
41 pub user: Arc<User>,
42 pub online: bool,
43 pub busy: bool,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum ContactRequestStatus {
48 None,
49 RequestSent,
50 RequestReceived,
51 RequestAccepted,
52}
53
54pub struct UserStore {
55 users: HashMap<u64, Arc<User>>,
56 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
57 current_user: watch::Receiver<Option<Arc<User>>>,
58 contacts: Vec<Arc<Contact>>,
59 incoming_contact_requests: Vec<Arc<User>>,
60 outgoing_contact_requests: Vec<Arc<User>>,
61 pending_contact_requests: HashMap<u64, usize>,
62 invite_info: Option<InviteInfo>,
63 client: Weak<Client>,
64 http: Arc<dyn HttpClient>,
65 _maintain_contacts: Task<()>,
66 _maintain_current_user: Task<()>,
67}
68
69#[derive(Clone)]
70pub struct InviteInfo {
71 pub count: u32,
72 pub url: Arc<str>,
73}
74
75pub enum Event {
76 Contact {
77 user: Arc<User>,
78 kind: ContactEventKind,
79 },
80 ShowContacts,
81}
82
83#[derive(Clone, Copy)]
84pub enum ContactEventKind {
85 Requested,
86 Accepted,
87 Cancelled,
88}
89
90impl Entity for UserStore {
91 type Event = Event;
92}
93
94enum UpdateContacts {
95 Update(proto::UpdateContacts),
96 Wait(postage::barrier::Sender),
97 Clear(postage::barrier::Sender),
98}
99
100impl UserStore {
101 pub fn new(
102 client: Arc<Client>,
103 http: Arc<dyn HttpClient>,
104 cx: &mut ModelContext<Self>,
105 ) -> Self {
106 let (mut current_user_tx, current_user_rx) = watch::channel();
107 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
108 let rpc_subscriptions = vec![
109 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
110 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
111 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
112 ];
113 Self {
114 users: Default::default(),
115 current_user: current_user_rx,
116 contacts: Default::default(),
117 incoming_contact_requests: Default::default(),
118 outgoing_contact_requests: Default::default(),
119 invite_info: None,
120 client: Arc::downgrade(&client),
121 update_contacts_tx,
122 http,
123 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
124 let _subscriptions = rpc_subscriptions;
125 while let Some(message) = update_contacts_rx.next().await {
126 if let Some(this) = this.upgrade(&cx) {
127 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
128 .log_err()
129 .await;
130 }
131 }
132 }),
133 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
134 let mut status = client.status();
135 while let Some(status) = status.next().await {
136 match status {
137 Status::Connected { .. } => {
138 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
139 let fetch_user = this
140 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
141 .log_err();
142 let fetch_metrics_id =
143 client.request(proto::GetPrivateUserInfo {}).log_err();
144 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
145 client.telemetry.set_authenticated_user_info(
146 info.as_ref().map(|info| info.metrics_id.clone()),
147 info.as_ref().map(|info| info.staff).unwrap_or(false),
148 cx.read(|cx| cx.global::<Settings>().telemetry()),
149 );
150
151 cx.update(|cx| {
152 cx.update_default_global(|staff_mode: &mut StaffMode, _| {
153 if !staff_mode.0 {
154 *staff_mode = StaffMode(
155 info.as_ref()
156 .map(|info| info.staff)
157 .unwrap_or_default(),
158 )
159 }
160 ()
161 });
162 });
163
164 current_user_tx.send(user).await.ok();
165 }
166 }
167 Status::SignedOut => {
168 current_user_tx.send(None).await.ok();
169 if let Some(this) = this.upgrade(&cx) {
170 this.update(&mut cx, |this, _| this.clear_contacts()).await;
171 }
172 }
173 Status::ConnectionLost => {
174 if let Some(this) = this.upgrade(&cx) {
175 this.update(&mut cx, |this, _| this.clear_contacts()).await;
176 }
177 }
178 _ => {}
179 }
180 }
181 }),
182 pending_contact_requests: Default::default(),
183 }
184 }
185
186 #[cfg(feature = "test-support")]
187 pub fn clear_cache(&mut self) {
188 self.users.clear();
189 }
190
191 async fn handle_update_invite_info(
192 this: ModelHandle<Self>,
193 message: TypedEnvelope<proto::UpdateInviteInfo>,
194 _: Arc<Client>,
195 mut cx: AsyncAppContext,
196 ) -> Result<()> {
197 this.update(&mut cx, |this, cx| {
198 this.invite_info = Some(InviteInfo {
199 url: Arc::from(message.payload.url),
200 count: message.payload.count,
201 });
202 cx.notify();
203 });
204 Ok(())
205 }
206
207 async fn handle_show_contacts(
208 this: ModelHandle<Self>,
209 _: TypedEnvelope<proto::ShowContacts>,
210 _: Arc<Client>,
211 mut cx: AsyncAppContext,
212 ) -> Result<()> {
213 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
214 Ok(())
215 }
216
217 pub fn invite_info(&self) -> Option<&InviteInfo> {
218 self.invite_info.as_ref()
219 }
220
221 async fn handle_update_contacts(
222 this: ModelHandle<Self>,
223 message: TypedEnvelope<proto::UpdateContacts>,
224 _: Arc<Client>,
225 mut cx: AsyncAppContext,
226 ) -> Result<()> {
227 this.update(&mut cx, |this, _| {
228 this.update_contacts_tx
229 .unbounded_send(UpdateContacts::Update(message.payload))
230 .unwrap();
231 });
232 Ok(())
233 }
234
235 fn update_contacts(
236 &mut self,
237 message: UpdateContacts,
238 cx: &mut ModelContext<Self>,
239 ) -> Task<Result<()>> {
240 match message {
241 UpdateContacts::Wait(barrier) => {
242 drop(barrier);
243 Task::ready(Ok(()))
244 }
245 UpdateContacts::Clear(barrier) => {
246 self.contacts.clear();
247 self.incoming_contact_requests.clear();
248 self.outgoing_contact_requests.clear();
249 drop(barrier);
250 Task::ready(Ok(()))
251 }
252 UpdateContacts::Update(message) => {
253 let mut user_ids = HashSet::default();
254 for contact in &message.contacts {
255 user_ids.insert(contact.user_id);
256 }
257 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
258 user_ids.extend(message.outgoing_requests.iter());
259
260 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
261 cx.spawn(|this, mut cx| async move {
262 load_users.await?;
263
264 // Users are fetched in parallel above and cached in call to get_users
265 // No need to paralellize here
266 let mut updated_contacts = Vec::new();
267 for contact in message.contacts {
268 let should_notify = contact.should_notify;
269 updated_contacts.push((
270 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
271 should_notify,
272 ));
273 }
274
275 let mut incoming_requests = Vec::new();
276 for request in message.incoming_requests {
277 incoming_requests.push({
278 let user = this
279 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
280 .await?;
281 (user, request.should_notify)
282 });
283 }
284
285 let mut outgoing_requests = Vec::new();
286 for requested_user_id in message.outgoing_requests {
287 outgoing_requests.push(
288 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
289 .await?,
290 );
291 }
292
293 let removed_contacts =
294 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
295 let removed_incoming_requests =
296 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
297 let removed_outgoing_requests =
298 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
299
300 this.update(&mut cx, |this, cx| {
301 // Remove contacts
302 this.contacts
303 .retain(|contact| !removed_contacts.contains(&contact.user.id));
304 // Update existing contacts and insert new ones
305 for (updated_contact, should_notify) in updated_contacts {
306 if should_notify {
307 cx.emit(Event::Contact {
308 user: updated_contact.user.clone(),
309 kind: ContactEventKind::Accepted,
310 });
311 }
312 match this.contacts.binary_search_by_key(
313 &&updated_contact.user.github_login,
314 |contact| &contact.user.github_login,
315 ) {
316 Ok(ix) => this.contacts[ix] = updated_contact,
317 Err(ix) => this.contacts.insert(ix, updated_contact),
318 }
319 }
320
321 // Remove incoming contact requests
322 this.incoming_contact_requests.retain(|user| {
323 if removed_incoming_requests.contains(&user.id) {
324 cx.emit(Event::Contact {
325 user: user.clone(),
326 kind: ContactEventKind::Cancelled,
327 });
328 false
329 } else {
330 true
331 }
332 });
333 // Update existing incoming requests and insert new ones
334 for (user, should_notify) in incoming_requests {
335 if should_notify {
336 cx.emit(Event::Contact {
337 user: user.clone(),
338 kind: ContactEventKind::Requested,
339 });
340 }
341
342 match this
343 .incoming_contact_requests
344 .binary_search_by_key(&&user.github_login, |contact| {
345 &contact.github_login
346 }) {
347 Ok(ix) => this.incoming_contact_requests[ix] = user,
348 Err(ix) => this.incoming_contact_requests.insert(ix, user),
349 }
350 }
351
352 // Remove outgoing contact requests
353 this.outgoing_contact_requests
354 .retain(|user| !removed_outgoing_requests.contains(&user.id));
355 // Update existing incoming requests and insert new ones
356 for request in outgoing_requests {
357 match this
358 .outgoing_contact_requests
359 .binary_search_by_key(&&request.github_login, |contact| {
360 &contact.github_login
361 }) {
362 Ok(ix) => this.outgoing_contact_requests[ix] = request,
363 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
364 }
365 }
366
367 cx.notify();
368 });
369
370 Ok(())
371 })
372 }
373 }
374 }
375
376 pub fn contacts(&self) -> &[Arc<Contact>] {
377 &self.contacts
378 }
379
380 pub fn has_contact(&self, user: &Arc<User>) -> bool {
381 self.contacts
382 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
383 .is_ok()
384 }
385
386 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
387 &self.incoming_contact_requests
388 }
389
390 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
391 &self.outgoing_contact_requests
392 }
393
394 pub fn is_contact_request_pending(&self, user: &User) -> bool {
395 self.pending_contact_requests.contains_key(&user.id)
396 }
397
398 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
399 if self
400 .contacts
401 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
402 .is_ok()
403 {
404 ContactRequestStatus::RequestAccepted
405 } else if self
406 .outgoing_contact_requests
407 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
408 .is_ok()
409 {
410 ContactRequestStatus::RequestSent
411 } else if self
412 .incoming_contact_requests
413 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
414 .is_ok()
415 {
416 ContactRequestStatus::RequestReceived
417 } else {
418 ContactRequestStatus::None
419 }
420 }
421
422 pub fn request_contact(
423 &mut self,
424 responder_id: u64,
425 cx: &mut ModelContext<Self>,
426 ) -> Task<Result<()>> {
427 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
428 }
429
430 pub fn remove_contact(
431 &mut self,
432 user_id: u64,
433 cx: &mut ModelContext<Self>,
434 ) -> Task<Result<()>> {
435 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
436 }
437
438 pub fn respond_to_contact_request(
439 &mut self,
440 requester_id: u64,
441 accept: bool,
442 cx: &mut ModelContext<Self>,
443 ) -> Task<Result<()>> {
444 self.perform_contact_request(
445 requester_id,
446 proto::RespondToContactRequest {
447 requester_id,
448 response: if accept {
449 proto::ContactRequestResponse::Accept
450 } else {
451 proto::ContactRequestResponse::Decline
452 } as i32,
453 },
454 cx,
455 )
456 }
457
458 pub fn dismiss_contact_request(
459 &mut self,
460 requester_id: u64,
461 cx: &mut ModelContext<Self>,
462 ) -> Task<Result<()>> {
463 let client = self.client.upgrade();
464 cx.spawn_weak(|_, _| async move {
465 client
466 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
467 .request(proto::RespondToContactRequest {
468 requester_id,
469 response: proto::ContactRequestResponse::Dismiss as i32,
470 })
471 .await?;
472 Ok(())
473 })
474 }
475
476 fn perform_contact_request<T: RequestMessage>(
477 &mut self,
478 user_id: u64,
479 request: T,
480 cx: &mut ModelContext<Self>,
481 ) -> Task<Result<()>> {
482 let client = self.client.upgrade();
483 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
484 cx.notify();
485
486 cx.spawn(|this, mut cx| async move {
487 let response = client
488 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
489 .request(request)
490 .await;
491 this.update(&mut cx, |this, cx| {
492 if let Entry::Occupied(mut request_count) =
493 this.pending_contact_requests.entry(user_id)
494 {
495 *request_count.get_mut() -= 1;
496 if *request_count.get() == 0 {
497 request_count.remove();
498 }
499 }
500 cx.notify();
501 });
502 response?;
503 Ok(())
504 })
505 }
506
507 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
508 let (tx, mut rx) = postage::barrier::channel();
509 self.update_contacts_tx
510 .unbounded_send(UpdateContacts::Clear(tx))
511 .unwrap();
512 async move {
513 rx.next().await;
514 }
515 }
516
517 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
518 let (tx, mut rx) = postage::barrier::channel();
519 self.update_contacts_tx
520 .unbounded_send(UpdateContacts::Wait(tx))
521 .unwrap();
522 async move {
523 rx.next().await;
524 }
525 }
526
527 pub fn get_users(
528 &mut self,
529 user_ids: Vec<u64>,
530 cx: &mut ModelContext<Self>,
531 ) -> Task<Result<Vec<Arc<User>>>> {
532 let mut user_ids_to_fetch = user_ids.clone();
533 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
534
535 cx.spawn(|this, mut cx| async move {
536 if !user_ids_to_fetch.is_empty() {
537 this.update(&mut cx, |this, cx| {
538 this.load_users(
539 proto::GetUsers {
540 user_ids: user_ids_to_fetch,
541 },
542 cx,
543 )
544 })
545 .await?;
546 }
547
548 this.read_with(&cx, |this, _| {
549 user_ids
550 .iter()
551 .map(|user_id| {
552 this.users
553 .get(user_id)
554 .cloned()
555 .ok_or_else(|| anyhow!("user {} not found", user_id))
556 })
557 .collect()
558 })
559 })
560 }
561
562 pub fn fuzzy_search_users(
563 &mut self,
564 query: String,
565 cx: &mut ModelContext<Self>,
566 ) -> Task<Result<Vec<Arc<User>>>> {
567 self.load_users(proto::FuzzySearchUsers { query }, cx)
568 }
569
570 pub fn get_user(
571 &mut self,
572 user_id: u64,
573 cx: &mut ModelContext<Self>,
574 ) -> Task<Result<Arc<User>>> {
575 if let Some(user) = self.users.get(&user_id).cloned() {
576 return cx.foreground().spawn(async move { Ok(user) });
577 }
578
579 let load_users = self.get_users(vec![user_id], cx);
580 cx.spawn(|this, mut cx| async move {
581 load_users.await?;
582 this.update(&mut cx, |this, _| {
583 this.users
584 .get(&user_id)
585 .cloned()
586 .ok_or_else(|| anyhow!("server responded with no users"))
587 })
588 })
589 }
590
591 pub fn current_user(&self) -> Option<Arc<User>> {
592 self.current_user.borrow().clone()
593 }
594
595 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
596 self.current_user.clone()
597 }
598
599 fn load_users(
600 &mut self,
601 request: impl RequestMessage<Response = UsersResponse>,
602 cx: &mut ModelContext<Self>,
603 ) -> Task<Result<Vec<Arc<User>>>> {
604 let client = self.client.clone();
605 let http = self.http.clone();
606 cx.spawn_weak(|this, mut cx| async move {
607 if let Some(rpc) = client.upgrade() {
608 let response = rpc.request(request).await.context("error loading users")?;
609 let users = future::join_all(
610 response
611 .users
612 .into_iter()
613 .map(|user| User::new(user, http.as_ref())),
614 )
615 .await;
616
617 if let Some(this) = this.upgrade(&cx) {
618 this.update(&mut cx, |this, _| {
619 for user in &users {
620 this.users.insert(user.id, user.clone());
621 }
622 });
623 }
624 Ok(users)
625 } else {
626 Ok(Vec::new())
627 }
628 })
629 }
630}
631
632impl User {
633 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
634 Arc::new(User {
635 id: message.id,
636 github_login: message.github_login,
637 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
638 })
639 }
640}
641
642impl Contact {
643 async fn from_proto(
644 contact: proto::Contact,
645 user_store: &ModelHandle<UserStore>,
646 cx: &mut AsyncAppContext,
647 ) -> Result<Self> {
648 let user = user_store
649 .update(cx, |user_store, cx| {
650 user_store.get_user(contact.user_id, cx)
651 })
652 .await?;
653 Ok(Self {
654 user,
655 online: contact.online,
656 busy: contact.busy,
657 })
658 }
659}
660
661async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
662 let mut response = http
663 .get(url, Default::default(), true)
664 .await
665 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
666
667 if !response.status().is_success() {
668 return Err(anyhow!("avatar request failed {:?}", response.status()));
669 }
670
671 let mut body = Vec::new();
672 response
673 .body_mut()
674 .read_to_end(&mut body)
675 .await
676 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
677 let format = image::guess_format(&body)?;
678 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
679 Ok(ImageData::new(image))
680}