1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use staff_mode::StaffMode;
9use std::sync::{Arc, Weak};
10use util::http::HttpClient;
11use util::TryFutureExt as _;
12
13#[derive(Default, Debug)]
14pub struct User {
15 pub id: u64,
16 pub github_login: String,
17 pub avatar: Option<Arc<ImageData>>,
18}
19
20impl PartialOrd for User {
21 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
22 Some(self.cmp(other))
23 }
24}
25
26impl Ord for User {
27 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
28 self.github_login.cmp(&other.github_login)
29 }
30}
31
32impl PartialEq for User {
33 fn eq(&self, other: &Self) -> bool {
34 self.id == other.id && self.github_login == other.github_login
35 }
36}
37
38impl Eq for User {}
39
40#[derive(Debug, PartialEq)]
41pub struct Contact {
42 pub user: Arc<User>,
43 pub online: bool,
44 pub busy: bool,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ContactRequestStatus {
49 None,
50 RequestSent,
51 RequestReceived,
52 RequestAccepted,
53}
54
55pub struct UserStore {
56 users: HashMap<u64, Arc<User>>,
57 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
58 current_user: watch::Receiver<Option<Arc<User>>>,
59 contacts: Vec<Arc<Contact>>,
60 incoming_contact_requests: Vec<Arc<User>>,
61 outgoing_contact_requests: Vec<Arc<User>>,
62 pending_contact_requests: HashMap<u64, usize>,
63 invite_info: Option<InviteInfo>,
64 client: Weak<Client>,
65 http: Arc<dyn HttpClient>,
66 _maintain_contacts: Task<()>,
67 _maintain_current_user: Task<()>,
68}
69
70#[derive(Clone)]
71pub struct InviteInfo {
72 pub count: u32,
73 pub url: Arc<str>,
74}
75
76pub enum Event {
77 Contact {
78 user: Arc<User>,
79 kind: ContactEventKind,
80 },
81 ShowContacts,
82}
83
84#[derive(Clone, Copy)]
85pub enum ContactEventKind {
86 Requested,
87 Accepted,
88 Cancelled,
89}
90
91impl Entity for UserStore {
92 type Event = Event;
93}
94
95enum UpdateContacts {
96 Update(proto::UpdateContacts),
97 Wait(postage::barrier::Sender),
98 Clear(postage::barrier::Sender),
99}
100
101impl UserStore {
102 pub fn new(
103 client: Arc<Client>,
104 http: Arc<dyn HttpClient>,
105 cx: &mut ModelContext<Self>,
106 ) -> Self {
107 let (mut current_user_tx, current_user_rx) = watch::channel();
108 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
109 let rpc_subscriptions = vec![
110 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
111 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
112 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
113 ];
114 Self {
115 users: Default::default(),
116 current_user: current_user_rx,
117 contacts: Default::default(),
118 incoming_contact_requests: Default::default(),
119 outgoing_contact_requests: Default::default(),
120 invite_info: None,
121 client: Arc::downgrade(&client),
122 update_contacts_tx,
123 http,
124 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
125 let _subscriptions = rpc_subscriptions;
126 while let Some(message) = update_contacts_rx.next().await {
127 if let Some(this) = this.upgrade(&cx) {
128 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
129 .log_err()
130 .await;
131 }
132 }
133 }),
134 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
135 let mut status = client.status();
136 while let Some(status) = status.next().await {
137 match status {
138 Status::Connected { .. } => {
139 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
140 let fetch_user = this
141 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
142 .log_err();
143 let fetch_metrics_id =
144 client.request(proto::GetPrivateUserInfo {}).log_err();
145 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
146 cx.read(|cx| {
147 client.telemetry.set_authenticated_user_info(
148 info.as_ref().map(|info| info.metrics_id.clone()),
149 info.as_ref().map(|info| info.staff).unwrap_or(false),
150 cx,
151 )
152 });
153
154 cx.update(|cx| {
155 cx.update_default_global(|staff_mode: &mut StaffMode, _| {
156 if !staff_mode.0 {
157 *staff_mode = StaffMode(
158 info.as_ref()
159 .map(|info| info.staff)
160 .unwrap_or_default(),
161 )
162 }
163 ()
164 });
165 });
166
167 current_user_tx.send(user).await.ok();
168
169 this.update(&mut cx, |_, cx| {
170 cx.notify();
171 });
172 }
173 }
174 Status::SignedOut => {
175 current_user_tx.send(None).await.ok();
176 if let Some(this) = this.upgrade(&cx) {
177 this.update(&mut cx, |this, cx| {
178 cx.notify();
179 this.clear_contacts()
180 })
181 .await;
182 }
183 }
184 Status::ConnectionLost => {
185 if let Some(this) = this.upgrade(&cx) {
186 this.update(&mut cx, |this, cx| {
187 cx.notify();
188 this.clear_contacts()
189 })
190 .await;
191 }
192 }
193 _ => {}
194 }
195 }
196 }),
197 pending_contact_requests: Default::default(),
198 }
199 }
200
201 #[cfg(feature = "test-support")]
202 pub fn clear_cache(&mut self) {
203 self.users.clear();
204 }
205
206 async fn handle_update_invite_info(
207 this: ModelHandle<Self>,
208 message: TypedEnvelope<proto::UpdateInviteInfo>,
209 _: Arc<Client>,
210 mut cx: AsyncAppContext,
211 ) -> Result<()> {
212 this.update(&mut cx, |this, cx| {
213 this.invite_info = Some(InviteInfo {
214 url: Arc::from(message.payload.url),
215 count: message.payload.count,
216 });
217 cx.notify();
218 });
219 Ok(())
220 }
221
222 async fn handle_show_contacts(
223 this: ModelHandle<Self>,
224 _: TypedEnvelope<proto::ShowContacts>,
225 _: Arc<Client>,
226 mut cx: AsyncAppContext,
227 ) -> Result<()> {
228 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
229 Ok(())
230 }
231
232 pub fn invite_info(&self) -> Option<&InviteInfo> {
233 self.invite_info.as_ref()
234 }
235
236 async fn handle_update_contacts(
237 this: ModelHandle<Self>,
238 message: TypedEnvelope<proto::UpdateContacts>,
239 _: Arc<Client>,
240 mut cx: AsyncAppContext,
241 ) -> Result<()> {
242 this.update(&mut cx, |this, _| {
243 this.update_contacts_tx
244 .unbounded_send(UpdateContacts::Update(message.payload))
245 .unwrap();
246 });
247 Ok(())
248 }
249
250 fn update_contacts(
251 &mut self,
252 message: UpdateContacts,
253 cx: &mut ModelContext<Self>,
254 ) -> Task<Result<()>> {
255 match message {
256 UpdateContacts::Wait(barrier) => {
257 drop(barrier);
258 Task::ready(Ok(()))
259 }
260 UpdateContacts::Clear(barrier) => {
261 self.contacts.clear();
262 self.incoming_contact_requests.clear();
263 self.outgoing_contact_requests.clear();
264 drop(barrier);
265 Task::ready(Ok(()))
266 }
267 UpdateContacts::Update(message) => {
268 let mut user_ids = HashSet::default();
269 for contact in &message.contacts {
270 user_ids.insert(contact.user_id);
271 }
272 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
273 user_ids.extend(message.outgoing_requests.iter());
274
275 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
276 cx.spawn(|this, mut cx| async move {
277 load_users.await?;
278
279 // Users are fetched in parallel above and cached in call to get_users
280 // No need to paralellize here
281 let mut updated_contacts = Vec::new();
282 for contact in message.contacts {
283 let should_notify = contact.should_notify;
284 updated_contacts.push((
285 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
286 should_notify,
287 ));
288 }
289
290 let mut incoming_requests = Vec::new();
291 for request in message.incoming_requests {
292 incoming_requests.push({
293 let user = this
294 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
295 .await?;
296 (user, request.should_notify)
297 });
298 }
299
300 let mut outgoing_requests = Vec::new();
301 for requested_user_id in message.outgoing_requests {
302 outgoing_requests.push(
303 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
304 .await?,
305 );
306 }
307
308 let removed_contacts =
309 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
310 let removed_incoming_requests =
311 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
312 let removed_outgoing_requests =
313 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
314
315 this.update(&mut cx, |this, cx| {
316 // Remove contacts
317 this.contacts
318 .retain(|contact| !removed_contacts.contains(&contact.user.id));
319 // Update existing contacts and insert new ones
320 for (updated_contact, should_notify) in updated_contacts {
321 if should_notify {
322 cx.emit(Event::Contact {
323 user: updated_contact.user.clone(),
324 kind: ContactEventKind::Accepted,
325 });
326 }
327 match this.contacts.binary_search_by_key(
328 &&updated_contact.user.github_login,
329 |contact| &contact.user.github_login,
330 ) {
331 Ok(ix) => this.contacts[ix] = updated_contact,
332 Err(ix) => this.contacts.insert(ix, updated_contact),
333 }
334 }
335
336 // Remove incoming contact requests
337 this.incoming_contact_requests.retain(|user| {
338 if removed_incoming_requests.contains(&user.id) {
339 cx.emit(Event::Contact {
340 user: user.clone(),
341 kind: ContactEventKind::Cancelled,
342 });
343 false
344 } else {
345 true
346 }
347 });
348 // Update existing incoming requests and insert new ones
349 for (user, should_notify) in incoming_requests {
350 if should_notify {
351 cx.emit(Event::Contact {
352 user: user.clone(),
353 kind: ContactEventKind::Requested,
354 });
355 }
356
357 match this
358 .incoming_contact_requests
359 .binary_search_by_key(&&user.github_login, |contact| {
360 &contact.github_login
361 }) {
362 Ok(ix) => this.incoming_contact_requests[ix] = user,
363 Err(ix) => this.incoming_contact_requests.insert(ix, user),
364 }
365 }
366
367 // Remove outgoing contact requests
368 this.outgoing_contact_requests
369 .retain(|user| !removed_outgoing_requests.contains(&user.id));
370 // Update existing incoming requests and insert new ones
371 for request in outgoing_requests {
372 match this
373 .outgoing_contact_requests
374 .binary_search_by_key(&&request.github_login, |contact| {
375 &contact.github_login
376 }) {
377 Ok(ix) => this.outgoing_contact_requests[ix] = request,
378 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
379 }
380 }
381
382 cx.notify();
383 });
384
385 Ok(())
386 })
387 }
388 }
389 }
390
391 pub fn contacts(&self) -> &[Arc<Contact>] {
392 &self.contacts
393 }
394
395 pub fn has_contact(&self, user: &Arc<User>) -> bool {
396 self.contacts
397 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
398 .is_ok()
399 }
400
401 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
402 &self.incoming_contact_requests
403 }
404
405 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
406 &self.outgoing_contact_requests
407 }
408
409 pub fn is_contact_request_pending(&self, user: &User) -> bool {
410 self.pending_contact_requests.contains_key(&user.id)
411 }
412
413 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
414 if self
415 .contacts
416 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
417 .is_ok()
418 {
419 ContactRequestStatus::RequestAccepted
420 } else if self
421 .outgoing_contact_requests
422 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
423 .is_ok()
424 {
425 ContactRequestStatus::RequestSent
426 } else if self
427 .incoming_contact_requests
428 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
429 .is_ok()
430 {
431 ContactRequestStatus::RequestReceived
432 } else {
433 ContactRequestStatus::None
434 }
435 }
436
437 pub fn request_contact(
438 &mut self,
439 responder_id: u64,
440 cx: &mut ModelContext<Self>,
441 ) -> Task<Result<()>> {
442 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
443 }
444
445 pub fn remove_contact(
446 &mut self,
447 user_id: u64,
448 cx: &mut ModelContext<Self>,
449 ) -> Task<Result<()>> {
450 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
451 }
452
453 pub fn respond_to_contact_request(
454 &mut self,
455 requester_id: u64,
456 accept: bool,
457 cx: &mut ModelContext<Self>,
458 ) -> Task<Result<()>> {
459 self.perform_contact_request(
460 requester_id,
461 proto::RespondToContactRequest {
462 requester_id,
463 response: if accept {
464 proto::ContactRequestResponse::Accept
465 } else {
466 proto::ContactRequestResponse::Decline
467 } as i32,
468 },
469 cx,
470 )
471 }
472
473 pub fn dismiss_contact_request(
474 &mut self,
475 requester_id: u64,
476 cx: &mut ModelContext<Self>,
477 ) -> Task<Result<()>> {
478 let client = self.client.upgrade();
479 cx.spawn_weak(|_, _| async move {
480 client
481 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
482 .request(proto::RespondToContactRequest {
483 requester_id,
484 response: proto::ContactRequestResponse::Dismiss as i32,
485 })
486 .await?;
487 Ok(())
488 })
489 }
490
491 fn perform_contact_request<T: RequestMessage>(
492 &mut self,
493 user_id: u64,
494 request: T,
495 cx: &mut ModelContext<Self>,
496 ) -> Task<Result<()>> {
497 let client = self.client.upgrade();
498 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
499 cx.notify();
500
501 cx.spawn(|this, mut cx| async move {
502 let response = client
503 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
504 .request(request)
505 .await;
506 this.update(&mut cx, |this, cx| {
507 if let Entry::Occupied(mut request_count) =
508 this.pending_contact_requests.entry(user_id)
509 {
510 *request_count.get_mut() -= 1;
511 if *request_count.get() == 0 {
512 request_count.remove();
513 }
514 }
515 cx.notify();
516 });
517 response?;
518 Ok(())
519 })
520 }
521
522 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
523 let (tx, mut rx) = postage::barrier::channel();
524 self.update_contacts_tx
525 .unbounded_send(UpdateContacts::Clear(tx))
526 .unwrap();
527 async move {
528 rx.next().await;
529 }
530 }
531
532 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
533 let (tx, mut rx) = postage::barrier::channel();
534 self.update_contacts_tx
535 .unbounded_send(UpdateContacts::Wait(tx))
536 .unwrap();
537 async move {
538 rx.next().await;
539 }
540 }
541
542 pub fn get_users(
543 &mut self,
544 user_ids: Vec<u64>,
545 cx: &mut ModelContext<Self>,
546 ) -> Task<Result<Vec<Arc<User>>>> {
547 let mut user_ids_to_fetch = user_ids.clone();
548 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
549
550 cx.spawn(|this, mut cx| async move {
551 if !user_ids_to_fetch.is_empty() {
552 this.update(&mut cx, |this, cx| {
553 this.load_users(
554 proto::GetUsers {
555 user_ids: user_ids_to_fetch,
556 },
557 cx,
558 )
559 })
560 .await?;
561 }
562
563 this.read_with(&cx, |this, _| {
564 user_ids
565 .iter()
566 .map(|user_id| {
567 this.users
568 .get(user_id)
569 .cloned()
570 .ok_or_else(|| anyhow!("user {} not found", user_id))
571 })
572 .collect()
573 })
574 })
575 }
576
577 pub fn fuzzy_search_users(
578 &mut self,
579 query: String,
580 cx: &mut ModelContext<Self>,
581 ) -> Task<Result<Vec<Arc<User>>>> {
582 self.load_users(proto::FuzzySearchUsers { query }, cx)
583 }
584
585 pub fn get_user(
586 &mut self,
587 user_id: u64,
588 cx: &mut ModelContext<Self>,
589 ) -> Task<Result<Arc<User>>> {
590 if let Some(user) = self.users.get(&user_id).cloned() {
591 return cx.foreground().spawn(async move { Ok(user) });
592 }
593
594 let load_users = self.get_users(vec![user_id], cx);
595 cx.spawn(|this, mut cx| async move {
596 load_users.await?;
597 this.update(&mut cx, |this, _| {
598 this.users
599 .get(&user_id)
600 .cloned()
601 .ok_or_else(|| anyhow!("server responded with no users"))
602 })
603 })
604 }
605
606 pub fn current_user(&self) -> Option<Arc<User>> {
607 self.current_user.borrow().clone()
608 }
609
610 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
611 self.current_user.clone()
612 }
613
614 fn load_users(
615 &mut self,
616 request: impl RequestMessage<Response = UsersResponse>,
617 cx: &mut ModelContext<Self>,
618 ) -> Task<Result<Vec<Arc<User>>>> {
619 let client = self.client.clone();
620 let http = self.http.clone();
621 cx.spawn_weak(|this, mut cx| async move {
622 if let Some(rpc) = client.upgrade() {
623 let response = rpc.request(request).await.context("error loading users")?;
624 let users = future::join_all(
625 response
626 .users
627 .into_iter()
628 .map(|user| User::new(user, http.as_ref())),
629 )
630 .await;
631
632 if let Some(this) = this.upgrade(&cx) {
633 this.update(&mut cx, |this, _| {
634 for user in &users {
635 this.users.insert(user.id, user.clone());
636 }
637 });
638 }
639 Ok(users)
640 } else {
641 Ok(Vec::new())
642 }
643 })
644 }
645}
646
647impl User {
648 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
649 Arc::new(User {
650 id: message.id,
651 github_login: message.github_login,
652 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
653 })
654 }
655}
656
657impl Contact {
658 async fn from_proto(
659 contact: proto::Contact,
660 user_store: &ModelHandle<UserStore>,
661 cx: &mut AsyncAppContext,
662 ) -> Result<Self> {
663 let user = user_store
664 .update(cx, |user_store, cx| {
665 user_store.get_user(contact.user_id, cx)
666 })
667 .await?;
668 Ok(Self {
669 user,
670 online: contact.online,
671 busy: contact.busy,
672 })
673 }
674}
675
676async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
677 let mut response = http
678 .get(url, Default::default(), true)
679 .await
680 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
681
682 if !response.status().is_success() {
683 return Err(anyhow!("avatar request failed {:?}", response.status()));
684 }
685
686 let mut body = Vec::new();
687 response
688 .body_mut()
689 .read_to_end(&mut body)
690 .await
691 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
692 let format = image::guess_format(&body)?;
693 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
694 Ok(ImageData::new(image))
695}