1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use settings::Settings;
9use staff_mode::StaffMode;
10use std::sync::{Arc, Weak};
11use util::http::HttpClient;
12use util::TryFutureExt as _;
13
14#[derive(Default, Debug)]
15pub struct User {
16 pub id: u64,
17 pub github_login: String,
18 pub avatar: Option<Arc<ImageData>>,
19}
20
21impl PartialOrd for User {
22 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
23 Some(self.cmp(other))
24 }
25}
26
27impl Ord for User {
28 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
29 self.github_login.cmp(&other.github_login)
30 }
31}
32
33impl PartialEq for User {
34 fn eq(&self, other: &Self) -> bool {
35 self.id == other.id && self.github_login == other.github_login
36 }
37}
38
39impl Eq for User {}
40
41#[derive(Debug, PartialEq)]
42pub struct Contact {
43 pub user: Arc<User>,
44 pub online: bool,
45 pub busy: bool,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum ContactRequestStatus {
50 None,
51 RequestSent,
52 RequestReceived,
53 RequestAccepted,
54}
55
56pub struct UserStore {
57 users: HashMap<u64, Arc<User>>,
58 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
59 current_user: watch::Receiver<Option<Arc<User>>>,
60 contacts: Vec<Arc<Contact>>,
61 incoming_contact_requests: Vec<Arc<User>>,
62 outgoing_contact_requests: Vec<Arc<User>>,
63 pending_contact_requests: HashMap<u64, usize>,
64 invite_info: Option<InviteInfo>,
65 client: Weak<Client>,
66 http: Arc<dyn HttpClient>,
67 _maintain_contacts: Task<()>,
68 _maintain_current_user: Task<()>,
69}
70
71#[derive(Clone)]
72pub struct InviteInfo {
73 pub count: u32,
74 pub url: Arc<str>,
75}
76
77pub enum Event {
78 Contact {
79 user: Arc<User>,
80 kind: ContactEventKind,
81 },
82 ShowContacts,
83}
84
85#[derive(Clone, Copy)]
86pub enum ContactEventKind {
87 Requested,
88 Accepted,
89 Cancelled,
90}
91
92impl Entity for UserStore {
93 type Event = Event;
94}
95
96enum UpdateContacts {
97 Update(proto::UpdateContacts),
98 Wait(postage::barrier::Sender),
99 Clear(postage::barrier::Sender),
100}
101
102impl UserStore {
103 pub fn new(
104 client: Arc<Client>,
105 http: Arc<dyn HttpClient>,
106 cx: &mut ModelContext<Self>,
107 ) -> Self {
108 let (mut current_user_tx, current_user_rx) = watch::channel();
109 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
110 let rpc_subscriptions = vec![
111 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
112 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
113 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
114 ];
115 Self {
116 users: Default::default(),
117 current_user: current_user_rx,
118 contacts: Default::default(),
119 incoming_contact_requests: Default::default(),
120 outgoing_contact_requests: Default::default(),
121 invite_info: None,
122 client: Arc::downgrade(&client),
123 update_contacts_tx,
124 http,
125 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
126 let _subscriptions = rpc_subscriptions;
127 while let Some(message) = update_contacts_rx.next().await {
128 if let Some(this) = this.upgrade(&cx) {
129 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
130 .log_err()
131 .await;
132 }
133 }
134 }),
135 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
136 let mut status = client.status();
137 while let Some(status) = status.next().await {
138 match status {
139 Status::Connected { .. } => {
140 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
141 let fetch_user = this
142 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
143 .log_err();
144 let fetch_metrics_id =
145 client.request(proto::GetPrivateUserInfo {}).log_err();
146 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
147 client.telemetry.set_authenticated_user_info(
148 info.as_ref().map(|info| info.metrics_id.clone()),
149 info.as_ref().map(|info| info.staff).unwrap_or(false),
150 cx.read(|cx| cx.global::<Settings>().telemetry()),
151 );
152
153 cx.update(|cx| {
154 cx.update_default_global(|staff_mode: &mut StaffMode, _| {
155 if !staff_mode.0 {
156 *staff_mode = StaffMode(
157 info.as_ref()
158 .map(|info| info.staff)
159 .unwrap_or_default(),
160 )
161 }
162 ()
163 });
164 });
165
166 current_user_tx.send(user).await.ok();
167 }
168 }
169 Status::SignedOut => {
170 current_user_tx.send(None).await.ok();
171 if let Some(this) = this.upgrade(&cx) {
172 this.update(&mut cx, |this, _| this.clear_contacts()).await;
173 }
174 }
175 Status::ConnectionLost => {
176 if let Some(this) = this.upgrade(&cx) {
177 this.update(&mut cx, |this, _| this.clear_contacts()).await;
178 }
179 }
180 _ => {}
181 }
182 }
183 }),
184 pending_contact_requests: Default::default(),
185 }
186 }
187
188 #[cfg(feature = "test-support")]
189 pub fn clear_cache(&mut self) {
190 self.users.clear();
191 }
192
193 async fn handle_update_invite_info(
194 this: ModelHandle<Self>,
195 message: TypedEnvelope<proto::UpdateInviteInfo>,
196 _: Arc<Client>,
197 mut cx: AsyncAppContext,
198 ) -> Result<()> {
199 this.update(&mut cx, |this, cx| {
200 this.invite_info = Some(InviteInfo {
201 url: Arc::from(message.payload.url),
202 count: message.payload.count,
203 });
204 cx.notify();
205 });
206 Ok(())
207 }
208
209 async fn handle_show_contacts(
210 this: ModelHandle<Self>,
211 _: TypedEnvelope<proto::ShowContacts>,
212 _: Arc<Client>,
213 mut cx: AsyncAppContext,
214 ) -> Result<()> {
215 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
216 Ok(())
217 }
218
219 pub fn invite_info(&self) -> Option<&InviteInfo> {
220 self.invite_info.as_ref()
221 }
222
223 async fn handle_update_contacts(
224 this: ModelHandle<Self>,
225 message: TypedEnvelope<proto::UpdateContacts>,
226 _: Arc<Client>,
227 mut cx: AsyncAppContext,
228 ) -> Result<()> {
229 this.update(&mut cx, |this, _| {
230 this.update_contacts_tx
231 .unbounded_send(UpdateContacts::Update(message.payload))
232 .unwrap();
233 });
234 Ok(())
235 }
236
237 fn update_contacts(
238 &mut self,
239 message: UpdateContacts,
240 cx: &mut ModelContext<Self>,
241 ) -> Task<Result<()>> {
242 match message {
243 UpdateContacts::Wait(barrier) => {
244 drop(barrier);
245 Task::ready(Ok(()))
246 }
247 UpdateContacts::Clear(barrier) => {
248 self.contacts.clear();
249 self.incoming_contact_requests.clear();
250 self.outgoing_contact_requests.clear();
251 drop(barrier);
252 Task::ready(Ok(()))
253 }
254 UpdateContacts::Update(message) => {
255 let mut user_ids = HashSet::default();
256 for contact in &message.contacts {
257 user_ids.insert(contact.user_id);
258 }
259 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
260 user_ids.extend(message.outgoing_requests.iter());
261
262 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
263 cx.spawn(|this, mut cx| async move {
264 load_users.await?;
265
266 // Users are fetched in parallel above and cached in call to get_users
267 // No need to paralellize here
268 let mut updated_contacts = Vec::new();
269 for contact in message.contacts {
270 let should_notify = contact.should_notify;
271 updated_contacts.push((
272 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
273 should_notify,
274 ));
275 }
276
277 let mut incoming_requests = Vec::new();
278 for request in message.incoming_requests {
279 incoming_requests.push({
280 let user = this
281 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
282 .await?;
283 (user, request.should_notify)
284 });
285 }
286
287 let mut outgoing_requests = Vec::new();
288 for requested_user_id in message.outgoing_requests {
289 outgoing_requests.push(
290 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
291 .await?,
292 );
293 }
294
295 let removed_contacts =
296 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
297 let removed_incoming_requests =
298 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
299 let removed_outgoing_requests =
300 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
301
302 this.update(&mut cx, |this, cx| {
303 // Remove contacts
304 this.contacts
305 .retain(|contact| !removed_contacts.contains(&contact.user.id));
306 // Update existing contacts and insert new ones
307 for (updated_contact, should_notify) in updated_contacts {
308 if should_notify {
309 cx.emit(Event::Contact {
310 user: updated_contact.user.clone(),
311 kind: ContactEventKind::Accepted,
312 });
313 }
314 match this.contacts.binary_search_by_key(
315 &&updated_contact.user.github_login,
316 |contact| &contact.user.github_login,
317 ) {
318 Ok(ix) => this.contacts[ix] = updated_contact,
319 Err(ix) => this.contacts.insert(ix, updated_contact),
320 }
321 }
322
323 // Remove incoming contact requests
324 this.incoming_contact_requests.retain(|user| {
325 if removed_incoming_requests.contains(&user.id) {
326 cx.emit(Event::Contact {
327 user: user.clone(),
328 kind: ContactEventKind::Cancelled,
329 });
330 false
331 } else {
332 true
333 }
334 });
335 // Update existing incoming requests and insert new ones
336 for (user, should_notify) in incoming_requests {
337 if should_notify {
338 cx.emit(Event::Contact {
339 user: user.clone(),
340 kind: ContactEventKind::Requested,
341 });
342 }
343
344 match this
345 .incoming_contact_requests
346 .binary_search_by_key(&&user.github_login, |contact| {
347 &contact.github_login
348 }) {
349 Ok(ix) => this.incoming_contact_requests[ix] = user,
350 Err(ix) => this.incoming_contact_requests.insert(ix, user),
351 }
352 }
353
354 // Remove outgoing contact requests
355 this.outgoing_contact_requests
356 .retain(|user| !removed_outgoing_requests.contains(&user.id));
357 // Update existing incoming requests and insert new ones
358 for request in outgoing_requests {
359 match this
360 .outgoing_contact_requests
361 .binary_search_by_key(&&request.github_login, |contact| {
362 &contact.github_login
363 }) {
364 Ok(ix) => this.outgoing_contact_requests[ix] = request,
365 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
366 }
367 }
368
369 cx.notify();
370 });
371
372 Ok(())
373 })
374 }
375 }
376 }
377
378 pub fn contacts(&self) -> &[Arc<Contact>] {
379 &self.contacts
380 }
381
382 pub fn has_contact(&self, user: &Arc<User>) -> bool {
383 self.contacts
384 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
385 .is_ok()
386 }
387
388 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
389 &self.incoming_contact_requests
390 }
391
392 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
393 &self.outgoing_contact_requests
394 }
395
396 pub fn is_contact_request_pending(&self, user: &User) -> bool {
397 self.pending_contact_requests.contains_key(&user.id)
398 }
399
400 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
401 if self
402 .contacts
403 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
404 .is_ok()
405 {
406 ContactRequestStatus::RequestAccepted
407 } else if self
408 .outgoing_contact_requests
409 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
410 .is_ok()
411 {
412 ContactRequestStatus::RequestSent
413 } else if self
414 .incoming_contact_requests
415 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
416 .is_ok()
417 {
418 ContactRequestStatus::RequestReceived
419 } else {
420 ContactRequestStatus::None
421 }
422 }
423
424 pub fn request_contact(
425 &mut self,
426 responder_id: u64,
427 cx: &mut ModelContext<Self>,
428 ) -> Task<Result<()>> {
429 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
430 }
431
432 pub fn remove_contact(
433 &mut self,
434 user_id: u64,
435 cx: &mut ModelContext<Self>,
436 ) -> Task<Result<()>> {
437 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
438 }
439
440 pub fn respond_to_contact_request(
441 &mut self,
442 requester_id: u64,
443 accept: bool,
444 cx: &mut ModelContext<Self>,
445 ) -> Task<Result<()>> {
446 self.perform_contact_request(
447 requester_id,
448 proto::RespondToContactRequest {
449 requester_id,
450 response: if accept {
451 proto::ContactRequestResponse::Accept
452 } else {
453 proto::ContactRequestResponse::Decline
454 } as i32,
455 },
456 cx,
457 )
458 }
459
460 pub fn dismiss_contact_request(
461 &mut self,
462 requester_id: u64,
463 cx: &mut ModelContext<Self>,
464 ) -> Task<Result<()>> {
465 let client = self.client.upgrade();
466 cx.spawn_weak(|_, _| async move {
467 client
468 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
469 .request(proto::RespondToContactRequest {
470 requester_id,
471 response: proto::ContactRequestResponse::Dismiss as i32,
472 })
473 .await?;
474 Ok(())
475 })
476 }
477
478 fn perform_contact_request<T: RequestMessage>(
479 &mut self,
480 user_id: u64,
481 request: T,
482 cx: &mut ModelContext<Self>,
483 ) -> Task<Result<()>> {
484 let client = self.client.upgrade();
485 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
486 cx.notify();
487
488 cx.spawn(|this, mut cx| async move {
489 let response = client
490 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
491 .request(request)
492 .await;
493 this.update(&mut cx, |this, cx| {
494 if let Entry::Occupied(mut request_count) =
495 this.pending_contact_requests.entry(user_id)
496 {
497 *request_count.get_mut() -= 1;
498 if *request_count.get() == 0 {
499 request_count.remove();
500 }
501 }
502 cx.notify();
503 });
504 response?;
505 Ok(())
506 })
507 }
508
509 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
510 let (tx, mut rx) = postage::barrier::channel();
511 self.update_contacts_tx
512 .unbounded_send(UpdateContacts::Clear(tx))
513 .unwrap();
514 async move {
515 rx.next().await;
516 }
517 }
518
519 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
520 let (tx, mut rx) = postage::barrier::channel();
521 self.update_contacts_tx
522 .unbounded_send(UpdateContacts::Wait(tx))
523 .unwrap();
524 async move {
525 rx.next().await;
526 }
527 }
528
529 pub fn get_users(
530 &mut self,
531 user_ids: Vec<u64>,
532 cx: &mut ModelContext<Self>,
533 ) -> Task<Result<Vec<Arc<User>>>> {
534 let mut user_ids_to_fetch = user_ids.clone();
535 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
536
537 cx.spawn(|this, mut cx| async move {
538 if !user_ids_to_fetch.is_empty() {
539 this.update(&mut cx, |this, cx| {
540 this.load_users(
541 proto::GetUsers {
542 user_ids: user_ids_to_fetch,
543 },
544 cx,
545 )
546 })
547 .await?;
548 }
549
550 this.read_with(&cx, |this, _| {
551 user_ids
552 .iter()
553 .map(|user_id| {
554 this.users
555 .get(user_id)
556 .cloned()
557 .ok_or_else(|| anyhow!("user {} not found", user_id))
558 })
559 .collect()
560 })
561 })
562 }
563
564 pub fn fuzzy_search_users(
565 &mut self,
566 query: String,
567 cx: &mut ModelContext<Self>,
568 ) -> Task<Result<Vec<Arc<User>>>> {
569 self.load_users(proto::FuzzySearchUsers { query }, cx)
570 }
571
572 pub fn get_user(
573 &mut self,
574 user_id: u64,
575 cx: &mut ModelContext<Self>,
576 ) -> Task<Result<Arc<User>>> {
577 if let Some(user) = self.users.get(&user_id).cloned() {
578 return cx.foreground().spawn(async move { Ok(user) });
579 }
580
581 let load_users = self.get_users(vec![user_id], cx);
582 cx.spawn(|this, mut cx| async move {
583 load_users.await?;
584 this.update(&mut cx, |this, _| {
585 this.users
586 .get(&user_id)
587 .cloned()
588 .ok_or_else(|| anyhow!("server responded with no users"))
589 })
590 })
591 }
592
593 pub fn current_user(&self) -> Option<Arc<User>> {
594 self.current_user.borrow().clone()
595 }
596
597 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
598 self.current_user.clone()
599 }
600
601 fn load_users(
602 &mut self,
603 request: impl RequestMessage<Response = UsersResponse>,
604 cx: &mut ModelContext<Self>,
605 ) -> Task<Result<Vec<Arc<User>>>> {
606 let client = self.client.clone();
607 let http = self.http.clone();
608 cx.spawn_weak(|this, mut cx| async move {
609 if let Some(rpc) = client.upgrade() {
610 let response = rpc.request(request).await.context("error loading users")?;
611 let users = future::join_all(
612 response
613 .users
614 .into_iter()
615 .map(|user| User::new(user, http.as_ref())),
616 )
617 .await;
618
619 if let Some(this) = this.upgrade(&cx) {
620 this.update(&mut cx, |this, _| {
621 for user in &users {
622 this.users.insert(user.id, user.clone());
623 }
624 });
625 }
626 Ok(users)
627 } else {
628 Ok(Vec::new())
629 }
630 })
631 }
632}
633
634impl User {
635 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
636 Arc::new(User {
637 id: message.id,
638 github_login: message.github_login,
639 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
640 })
641 }
642}
643
644impl Contact {
645 async fn from_proto(
646 contact: proto::Contact,
647 user_store: &ModelHandle<UserStore>,
648 cx: &mut AsyncAppContext,
649 ) -> Result<Self> {
650 let user = user_store
651 .update(cx, |user_store, cx| {
652 user_store.get_user(contact.user_id, cx)
653 })
654 .await?;
655 Ok(Self {
656 user,
657 online: contact.online,
658 busy: contact.busy,
659 })
660 }
661}
662
663async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
664 let mut response = http
665 .get(url, Default::default(), true)
666 .await
667 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
668
669 if !response.status().is_success() {
670 return Err(anyhow!("avatar request failed {:?}", response.status()));
671 }
672
673 let mut body = Vec::new();
674 response
675 .body_mut()
676 .read_to_end(&mut body)
677 .await
678 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
679 let format = image::guess_format(&body)?;
680 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
681 Ok(ImageData::new(image))
682}