1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use settings::Settings;
9use std::sync::{Arc, Weak};
10use util::{StaffMode, TryFutureExt as _};
11
12#[derive(Default, Debug)]
13pub struct User {
14 pub id: u64,
15 pub github_login: String,
16 pub avatar: Option<Arc<ImageData>>,
17}
18
19impl PartialOrd for User {
20 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
21 Some(self.cmp(other))
22 }
23}
24
25impl Ord for User {
26 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
27 self.github_login.cmp(&other.github_login)
28 }
29}
30
31impl PartialEq for User {
32 fn eq(&self, other: &Self) -> bool {
33 self.id == other.id && self.github_login == other.github_login
34 }
35}
36
37impl Eq for User {}
38
39#[derive(Debug, PartialEq)]
40pub struct Contact {
41 pub user: Arc<User>,
42 pub online: bool,
43 pub busy: bool,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum ContactRequestStatus {
48 None,
49 RequestSent,
50 RequestReceived,
51 RequestAccepted,
52}
53
54pub struct UserStore {
55 users: HashMap<u64, Arc<User>>,
56 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
57 current_user: watch::Receiver<Option<Arc<User>>>,
58 contacts: Vec<Arc<Contact>>,
59 incoming_contact_requests: Vec<Arc<User>>,
60 outgoing_contact_requests: Vec<Arc<User>>,
61 pending_contact_requests: HashMap<u64, usize>,
62 invite_info: Option<InviteInfo>,
63 client: Weak<Client>,
64 http: Arc<dyn HttpClient>,
65 _maintain_contacts: Task<()>,
66 _maintain_current_user: Task<()>,
67}
68
69#[derive(Clone)]
70pub struct InviteInfo {
71 pub count: u32,
72 pub url: Arc<str>,
73}
74
75pub enum Event {
76 Contact {
77 user: Arc<User>,
78 kind: ContactEventKind,
79 },
80 ShowContacts,
81}
82
83#[derive(Clone, Copy)]
84pub enum ContactEventKind {
85 Requested,
86 Accepted,
87 Cancelled,
88}
89
90impl Entity for UserStore {
91 type Event = Event;
92}
93
94enum UpdateContacts {
95 Update(proto::UpdateContacts),
96 Wait(postage::barrier::Sender),
97 Clear(postage::barrier::Sender),
98}
99
100impl UserStore {
101 pub fn new(
102 client: Arc<Client>,
103 http: Arc<dyn HttpClient>,
104 cx: &mut ModelContext<Self>,
105 ) -> Self {
106 let (mut current_user_tx, current_user_rx) = watch::channel();
107 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
108 let rpc_subscriptions = vec![
109 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
110 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
111 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
112 ];
113 Self {
114 users: Default::default(),
115 current_user: current_user_rx,
116 contacts: Default::default(),
117 incoming_contact_requests: Default::default(),
118 outgoing_contact_requests: Default::default(),
119 invite_info: None,
120 client: Arc::downgrade(&client),
121 update_contacts_tx,
122 http,
123 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
124 let _subscriptions = rpc_subscriptions;
125 while let Some(message) = update_contacts_rx.next().await {
126 if let Some(this) = this.upgrade(&cx) {
127 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
128 .log_err()
129 .await;
130 }
131 }
132 }),
133 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
134 let mut status = client.status();
135 while let Some(status) = status.next().await {
136 match status {
137 Status::Connected { .. } => {
138 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
139 let fetch_user = this
140 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
141 .log_err();
142 let fetch_metrics_id =
143 client.request(proto::GetPrivateUserInfo {}).log_err();
144 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
145 client.telemetry.set_authenticated_user_info(
146 info.as_ref().map(|info| info.metrics_id.clone()),
147 info.as_ref().map(|info| info.staff).unwrap_or(false),
148 cx.read(|cx| cx.global::<Settings>().telemetry()),
149 );
150
151 cx.update(|cx| {
152 cx.update_default_global(|staff_mode: &mut StaffMode, _| {
153 if !staff_mode.0 {
154 *staff_mode = StaffMode(
155 info.as_ref()
156 .map(|info| info.staff)
157 .unwrap_or_default(),
158 )
159 }
160 ()
161 });
162 });
163
164 current_user_tx.send(user).await.ok();
165 }
166 }
167 Status::SignedOut => {
168 current_user_tx.send(None).await.ok();
169 if let Some(this) = this.upgrade(&cx) {
170 this.update(&mut cx, |this, _| this.clear_contacts()).await;
171 }
172 }
173 Status::ConnectionLost => {
174 if let Some(this) = this.upgrade(&cx) {
175 this.update(&mut cx, |this, _| this.clear_contacts()).await;
176 }
177 }
178 _ => {}
179 }
180 }
181 }),
182 pending_contact_requests: Default::default(),
183 }
184 }
185
186 async fn handle_update_invite_info(
187 this: ModelHandle<Self>,
188 message: TypedEnvelope<proto::UpdateInviteInfo>,
189 _: Arc<Client>,
190 mut cx: AsyncAppContext,
191 ) -> Result<()> {
192 this.update(&mut cx, |this, cx| {
193 this.invite_info = Some(InviteInfo {
194 url: Arc::from(message.payload.url),
195 count: message.payload.count,
196 });
197 cx.notify();
198 });
199 Ok(())
200 }
201
202 async fn handle_show_contacts(
203 this: ModelHandle<Self>,
204 _: TypedEnvelope<proto::ShowContacts>,
205 _: Arc<Client>,
206 mut cx: AsyncAppContext,
207 ) -> Result<()> {
208 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
209 Ok(())
210 }
211
212 pub fn invite_info(&self) -> Option<&InviteInfo> {
213 self.invite_info.as_ref()
214 }
215
216 async fn handle_update_contacts(
217 this: ModelHandle<Self>,
218 message: TypedEnvelope<proto::UpdateContacts>,
219 _: Arc<Client>,
220 mut cx: AsyncAppContext,
221 ) -> Result<()> {
222 this.update(&mut cx, |this, _| {
223 this.update_contacts_tx
224 .unbounded_send(UpdateContacts::Update(message.payload))
225 .unwrap();
226 });
227 Ok(())
228 }
229
230 fn update_contacts(
231 &mut self,
232 message: UpdateContacts,
233 cx: &mut ModelContext<Self>,
234 ) -> Task<Result<()>> {
235 match message {
236 UpdateContacts::Wait(barrier) => {
237 drop(barrier);
238 Task::ready(Ok(()))
239 }
240 UpdateContacts::Clear(barrier) => {
241 self.contacts.clear();
242 self.incoming_contact_requests.clear();
243 self.outgoing_contact_requests.clear();
244 drop(barrier);
245 Task::ready(Ok(()))
246 }
247 UpdateContacts::Update(message) => {
248 let mut user_ids = HashSet::default();
249 for contact in &message.contacts {
250 user_ids.insert(contact.user_id);
251 }
252 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
253 user_ids.extend(message.outgoing_requests.iter());
254
255 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
256 cx.spawn(|this, mut cx| async move {
257 load_users.await?;
258
259 // Users are fetched in parallel above and cached in call to get_users
260 // No need to paralellize here
261 let mut updated_contacts = Vec::new();
262 for contact in message.contacts {
263 let should_notify = contact.should_notify;
264 updated_contacts.push((
265 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
266 should_notify,
267 ));
268 }
269
270 let mut incoming_requests = Vec::new();
271 for request in message.incoming_requests {
272 incoming_requests.push({
273 let user = this
274 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
275 .await?;
276 (user, request.should_notify)
277 });
278 }
279
280 let mut outgoing_requests = Vec::new();
281 for requested_user_id in message.outgoing_requests {
282 outgoing_requests.push(
283 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
284 .await?,
285 );
286 }
287
288 let removed_contacts =
289 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
290 let removed_incoming_requests =
291 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
292 let removed_outgoing_requests =
293 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
294
295 this.update(&mut cx, |this, cx| {
296 // Remove contacts
297 this.contacts
298 .retain(|contact| !removed_contacts.contains(&contact.user.id));
299 // Update existing contacts and insert new ones
300 for (updated_contact, should_notify) in updated_contacts {
301 if should_notify {
302 cx.emit(Event::Contact {
303 user: updated_contact.user.clone(),
304 kind: ContactEventKind::Accepted,
305 });
306 }
307 match this.contacts.binary_search_by_key(
308 &&updated_contact.user.github_login,
309 |contact| &contact.user.github_login,
310 ) {
311 Ok(ix) => this.contacts[ix] = updated_contact,
312 Err(ix) => this.contacts.insert(ix, updated_contact),
313 }
314 }
315
316 // Remove incoming contact requests
317 this.incoming_contact_requests.retain(|user| {
318 if removed_incoming_requests.contains(&user.id) {
319 cx.emit(Event::Contact {
320 user: user.clone(),
321 kind: ContactEventKind::Cancelled,
322 });
323 false
324 } else {
325 true
326 }
327 });
328 // Update existing incoming requests and insert new ones
329 for (user, should_notify) in incoming_requests {
330 if should_notify {
331 cx.emit(Event::Contact {
332 user: user.clone(),
333 kind: ContactEventKind::Requested,
334 });
335 }
336
337 match this
338 .incoming_contact_requests
339 .binary_search_by_key(&&user.github_login, |contact| {
340 &contact.github_login
341 }) {
342 Ok(ix) => this.incoming_contact_requests[ix] = user,
343 Err(ix) => this.incoming_contact_requests.insert(ix, user),
344 }
345 }
346
347 // Remove outgoing contact requests
348 this.outgoing_contact_requests
349 .retain(|user| !removed_outgoing_requests.contains(&user.id));
350 // Update existing incoming requests and insert new ones
351 for request in outgoing_requests {
352 match this
353 .outgoing_contact_requests
354 .binary_search_by_key(&&request.github_login, |contact| {
355 &contact.github_login
356 }) {
357 Ok(ix) => this.outgoing_contact_requests[ix] = request,
358 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
359 }
360 }
361
362 cx.notify();
363 });
364
365 Ok(())
366 })
367 }
368 }
369 }
370
371 pub fn contacts(&self) -> &[Arc<Contact>] {
372 &self.contacts
373 }
374
375 pub fn has_contact(&self, user: &Arc<User>) -> bool {
376 self.contacts
377 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
378 .is_ok()
379 }
380
381 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
382 &self.incoming_contact_requests
383 }
384
385 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
386 &self.outgoing_contact_requests
387 }
388
389 pub fn is_contact_request_pending(&self, user: &User) -> bool {
390 self.pending_contact_requests.contains_key(&user.id)
391 }
392
393 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
394 if self
395 .contacts
396 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
397 .is_ok()
398 {
399 ContactRequestStatus::RequestAccepted
400 } else if self
401 .outgoing_contact_requests
402 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
403 .is_ok()
404 {
405 ContactRequestStatus::RequestSent
406 } else if self
407 .incoming_contact_requests
408 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
409 .is_ok()
410 {
411 ContactRequestStatus::RequestReceived
412 } else {
413 ContactRequestStatus::None
414 }
415 }
416
417 pub fn request_contact(
418 &mut self,
419 responder_id: u64,
420 cx: &mut ModelContext<Self>,
421 ) -> Task<Result<()>> {
422 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
423 }
424
425 pub fn remove_contact(
426 &mut self,
427 user_id: u64,
428 cx: &mut ModelContext<Self>,
429 ) -> Task<Result<()>> {
430 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
431 }
432
433 pub fn respond_to_contact_request(
434 &mut self,
435 requester_id: u64,
436 accept: bool,
437 cx: &mut ModelContext<Self>,
438 ) -> Task<Result<()>> {
439 self.perform_contact_request(
440 requester_id,
441 proto::RespondToContactRequest {
442 requester_id,
443 response: if accept {
444 proto::ContactRequestResponse::Accept
445 } else {
446 proto::ContactRequestResponse::Decline
447 } as i32,
448 },
449 cx,
450 )
451 }
452
453 pub fn dismiss_contact_request(
454 &mut self,
455 requester_id: u64,
456 cx: &mut ModelContext<Self>,
457 ) -> Task<Result<()>> {
458 let client = self.client.upgrade();
459 cx.spawn_weak(|_, _| async move {
460 client
461 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
462 .request(proto::RespondToContactRequest {
463 requester_id,
464 response: proto::ContactRequestResponse::Dismiss as i32,
465 })
466 .await?;
467 Ok(())
468 })
469 }
470
471 fn perform_contact_request<T: RequestMessage>(
472 &mut self,
473 user_id: u64,
474 request: T,
475 cx: &mut ModelContext<Self>,
476 ) -> Task<Result<()>> {
477 let client = self.client.upgrade();
478 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
479 cx.notify();
480
481 cx.spawn(|this, mut cx| async move {
482 let response = client
483 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
484 .request(request)
485 .await;
486 this.update(&mut cx, |this, cx| {
487 if let Entry::Occupied(mut request_count) =
488 this.pending_contact_requests.entry(user_id)
489 {
490 *request_count.get_mut() -= 1;
491 if *request_count.get() == 0 {
492 request_count.remove();
493 }
494 }
495 cx.notify();
496 });
497 response?;
498 Ok(())
499 })
500 }
501
502 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
503 let (tx, mut rx) = postage::barrier::channel();
504 self.update_contacts_tx
505 .unbounded_send(UpdateContacts::Clear(tx))
506 .unwrap();
507 async move {
508 rx.next().await;
509 }
510 }
511
512 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
513 let (tx, mut rx) = postage::barrier::channel();
514 self.update_contacts_tx
515 .unbounded_send(UpdateContacts::Wait(tx))
516 .unwrap();
517 async move {
518 rx.next().await;
519 }
520 }
521
522 pub fn get_users(
523 &mut self,
524 user_ids: Vec<u64>,
525 cx: &mut ModelContext<Self>,
526 ) -> Task<Result<Vec<Arc<User>>>> {
527 let mut user_ids_to_fetch = user_ids.clone();
528 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
529
530 cx.spawn(|this, mut cx| async move {
531 if !user_ids_to_fetch.is_empty() {
532 this.update(&mut cx, |this, cx| {
533 this.load_users(
534 proto::GetUsers {
535 user_ids: user_ids_to_fetch,
536 },
537 cx,
538 )
539 })
540 .await?;
541 }
542
543 this.read_with(&cx, |this, _| {
544 user_ids
545 .iter()
546 .map(|user_id| {
547 this.users
548 .get(user_id)
549 .cloned()
550 .ok_or_else(|| anyhow!("user {} not found", user_id))
551 })
552 .collect()
553 })
554 })
555 }
556
557 pub fn fuzzy_search_users(
558 &mut self,
559 query: String,
560 cx: &mut ModelContext<Self>,
561 ) -> Task<Result<Vec<Arc<User>>>> {
562 self.load_users(proto::FuzzySearchUsers { query }, cx)
563 }
564
565 pub fn get_user(
566 &mut self,
567 user_id: u64,
568 cx: &mut ModelContext<Self>,
569 ) -> Task<Result<Arc<User>>> {
570 if let Some(user) = self.users.get(&user_id).cloned() {
571 return cx.foreground().spawn(async move { Ok(user) });
572 }
573
574 let load_users = self.get_users(vec![user_id], cx);
575 cx.spawn(|this, mut cx| async move {
576 load_users.await?;
577 this.update(&mut cx, |this, _| {
578 this.users
579 .get(&user_id)
580 .cloned()
581 .ok_or_else(|| anyhow!("server responded with no users"))
582 })
583 })
584 }
585
586 pub fn current_user(&self) -> Option<Arc<User>> {
587 self.current_user.borrow().clone()
588 }
589
590 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
591 self.current_user.clone()
592 }
593
594 fn load_users(
595 &mut self,
596 request: impl RequestMessage<Response = UsersResponse>,
597 cx: &mut ModelContext<Self>,
598 ) -> Task<Result<Vec<Arc<User>>>> {
599 let client = self.client.clone();
600 let http = self.http.clone();
601 cx.spawn_weak(|this, mut cx| async move {
602 if let Some(rpc) = client.upgrade() {
603 let response = rpc.request(request).await.context("error loading users")?;
604 let users = future::join_all(
605 response
606 .users
607 .into_iter()
608 .map(|user| User::new(user, http.as_ref())),
609 )
610 .await;
611
612 if let Some(this) = this.upgrade(&cx) {
613 this.update(&mut cx, |this, _| {
614 for user in &users {
615 this.users.insert(user.id, user.clone());
616 }
617 });
618 }
619 Ok(users)
620 } else {
621 Ok(Vec::new())
622 }
623 })
624 }
625}
626
627impl User {
628 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
629 Arc::new(User {
630 id: message.id,
631 github_login: message.github_login,
632 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
633 })
634 }
635}
636
637impl Contact {
638 async fn from_proto(
639 contact: proto::Contact,
640 user_store: &ModelHandle<UserStore>,
641 cx: &mut AsyncAppContext,
642 ) -> Result<Self> {
643 let user = user_store
644 .update(cx, |user_store, cx| {
645 user_store.get_user(contact.user_id, cx)
646 })
647 .await?;
648 Ok(Self {
649 user,
650 online: contact.online,
651 busy: contact.busy,
652 })
653 }
654}
655
656async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
657 let mut response = http
658 .get(url, Default::default(), true)
659 .await
660 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
661
662 if !response.status().is_success() {
663 return Err(anyhow!("avatar request failed {:?}", response.status()));
664 }
665
666 let mut body = Vec::new();
667 response
668 .body_mut()
669 .read_to_end(&mut body)
670 .await
671 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
672 let format = image::guess_format(&body)?;
673 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
674 Ok(ImageData::new(image))
675}