1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use staff_mode::StaffMode;
9use std::sync::{Arc, Weak};
10use util::http::HttpClient;
11use util::TryFutureExt as _;
12
13pub type UserId = u64;
14
15#[derive(Default, Debug)]
16pub struct User {
17 pub id: UserId,
18 pub github_login: String,
19 pub avatar: Option<Arc<ImageData>>,
20}
21
22impl PartialOrd for User {
23 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
24 Some(self.cmp(other))
25 }
26}
27
28impl Ord for User {
29 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
30 self.github_login.cmp(&other.github_login)
31 }
32}
33
34impl PartialEq for User {
35 fn eq(&self, other: &Self) -> bool {
36 self.id == other.id && self.github_login == other.github_login
37 }
38}
39
40impl Eq for User {}
41
42#[derive(Debug, PartialEq)]
43pub struct Contact {
44 pub user: Arc<User>,
45 pub online: bool,
46 pub busy: bool,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum ContactRequestStatus {
51 None,
52 RequestSent,
53 RequestReceived,
54 RequestAccepted,
55}
56
57pub struct UserStore {
58 users: HashMap<u64, Arc<User>>,
59 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
60 current_user: watch::Receiver<Option<Arc<User>>>,
61 contacts: Vec<Arc<Contact>>,
62 incoming_contact_requests: Vec<Arc<User>>,
63 outgoing_contact_requests: Vec<Arc<User>>,
64 pending_contact_requests: HashMap<u64, usize>,
65 invite_info: Option<InviteInfo>,
66 client: Weak<Client>,
67 http: Arc<dyn HttpClient>,
68 _maintain_contacts: Task<()>,
69 _maintain_current_user: Task<()>,
70}
71
72#[derive(Clone)]
73pub struct InviteInfo {
74 pub count: u32,
75 pub url: Arc<str>,
76}
77
78pub enum Event {
79 Contact {
80 user: Arc<User>,
81 kind: ContactEventKind,
82 },
83 ShowContacts,
84}
85
86#[derive(Clone, Copy)]
87pub enum ContactEventKind {
88 Requested,
89 Accepted,
90 Cancelled,
91}
92
93impl Entity for UserStore {
94 type Event = Event;
95}
96
97enum UpdateContacts {
98 Update(proto::UpdateContacts),
99 Wait(postage::barrier::Sender),
100 Clear(postage::barrier::Sender),
101}
102
103impl UserStore {
104 pub fn new(
105 client: Arc<Client>,
106 http: Arc<dyn HttpClient>,
107 cx: &mut ModelContext<Self>,
108 ) -> Self {
109 let (mut current_user_tx, current_user_rx) = watch::channel();
110 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
111 let rpc_subscriptions = vec![
112 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
113 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
114 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
115 ];
116 Self {
117 users: Default::default(),
118 current_user: current_user_rx,
119 contacts: Default::default(),
120 incoming_contact_requests: Default::default(),
121 outgoing_contact_requests: Default::default(),
122 invite_info: None,
123 client: Arc::downgrade(&client),
124 update_contacts_tx,
125 http,
126 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
127 let _subscriptions = rpc_subscriptions;
128 while let Some(message) = update_contacts_rx.next().await {
129 if let Some(this) = this.upgrade(&cx) {
130 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
131 .log_err()
132 .await;
133 }
134 }
135 }),
136 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
137 let mut status = client.status();
138 while let Some(status) = status.next().await {
139 match status {
140 Status::Connected { .. } => {
141 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
142 let fetch_user = this
143 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
144 .log_err();
145 let fetch_metrics_id =
146 client.request(proto::GetPrivateUserInfo {}).log_err();
147 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
148 cx.read(|cx| {
149 client.telemetry.set_authenticated_user_info(
150 info.as_ref().map(|info| info.metrics_id.clone()),
151 info.as_ref().map(|info| info.staff).unwrap_or(false),
152 cx,
153 )
154 });
155
156 cx.update(|cx| {
157 cx.update_default_global(|staff_mode: &mut StaffMode, _| {
158 if !staff_mode.0 {
159 *staff_mode = StaffMode(
160 info.as_ref()
161 .map(|info| info.staff)
162 .unwrap_or_default(),
163 )
164 }
165 ()
166 });
167 });
168
169 current_user_tx.send(user).await.ok();
170
171 this.update(&mut cx, |_, cx| {
172 cx.notify();
173 });
174 }
175 }
176 Status::SignedOut => {
177 current_user_tx.send(None).await.ok();
178 if let Some(this) = this.upgrade(&cx) {
179 this.update(&mut cx, |this, cx| {
180 cx.notify();
181 this.clear_contacts()
182 })
183 .await;
184 }
185 }
186 Status::ConnectionLost => {
187 if let Some(this) = this.upgrade(&cx) {
188 this.update(&mut cx, |this, cx| {
189 cx.notify();
190 this.clear_contacts()
191 })
192 .await;
193 }
194 }
195 _ => {}
196 }
197 }
198 }),
199 pending_contact_requests: Default::default(),
200 }
201 }
202
203 #[cfg(feature = "test-support")]
204 pub fn clear_cache(&mut self) {
205 self.users.clear();
206 }
207
208 async fn handle_update_invite_info(
209 this: ModelHandle<Self>,
210 message: TypedEnvelope<proto::UpdateInviteInfo>,
211 _: Arc<Client>,
212 mut cx: AsyncAppContext,
213 ) -> Result<()> {
214 this.update(&mut cx, |this, cx| {
215 this.invite_info = Some(InviteInfo {
216 url: Arc::from(message.payload.url),
217 count: message.payload.count,
218 });
219 cx.notify();
220 });
221 Ok(())
222 }
223
224 async fn handle_show_contacts(
225 this: ModelHandle<Self>,
226 _: TypedEnvelope<proto::ShowContacts>,
227 _: Arc<Client>,
228 mut cx: AsyncAppContext,
229 ) -> Result<()> {
230 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
231 Ok(())
232 }
233
234 pub fn invite_info(&self) -> Option<&InviteInfo> {
235 self.invite_info.as_ref()
236 }
237
238 async fn handle_update_contacts(
239 this: ModelHandle<Self>,
240 message: TypedEnvelope<proto::UpdateContacts>,
241 _: Arc<Client>,
242 mut cx: AsyncAppContext,
243 ) -> Result<()> {
244 this.update(&mut cx, |this, _| {
245 this.update_contacts_tx
246 .unbounded_send(UpdateContacts::Update(message.payload))
247 .unwrap();
248 });
249 Ok(())
250 }
251
252 fn update_contacts(
253 &mut self,
254 message: UpdateContacts,
255 cx: &mut ModelContext<Self>,
256 ) -> Task<Result<()>> {
257 match message {
258 UpdateContacts::Wait(barrier) => {
259 drop(barrier);
260 Task::ready(Ok(()))
261 }
262 UpdateContacts::Clear(barrier) => {
263 self.contacts.clear();
264 self.incoming_contact_requests.clear();
265 self.outgoing_contact_requests.clear();
266 drop(barrier);
267 Task::ready(Ok(()))
268 }
269 UpdateContacts::Update(message) => {
270 let mut user_ids = HashSet::default();
271 for contact in &message.contacts {
272 user_ids.insert(contact.user_id);
273 }
274 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
275 user_ids.extend(message.outgoing_requests.iter());
276
277 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
278 cx.spawn(|this, mut cx| async move {
279 load_users.await?;
280
281 // Users are fetched in parallel above and cached in call to get_users
282 // No need to paralellize here
283 let mut updated_contacts = Vec::new();
284 for contact in message.contacts {
285 let should_notify = contact.should_notify;
286 updated_contacts.push((
287 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
288 should_notify,
289 ));
290 }
291
292 let mut incoming_requests = Vec::new();
293 for request in message.incoming_requests {
294 incoming_requests.push({
295 let user = this
296 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
297 .await?;
298 (user, request.should_notify)
299 });
300 }
301
302 let mut outgoing_requests = Vec::new();
303 for requested_user_id in message.outgoing_requests {
304 outgoing_requests.push(
305 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
306 .await?,
307 );
308 }
309
310 let removed_contacts =
311 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
312 let removed_incoming_requests =
313 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
314 let removed_outgoing_requests =
315 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
316
317 this.update(&mut cx, |this, cx| {
318 // Remove contacts
319 this.contacts
320 .retain(|contact| !removed_contacts.contains(&contact.user.id));
321 // Update existing contacts and insert new ones
322 for (updated_contact, should_notify) in updated_contacts {
323 if should_notify {
324 cx.emit(Event::Contact {
325 user: updated_contact.user.clone(),
326 kind: ContactEventKind::Accepted,
327 });
328 }
329 match this.contacts.binary_search_by_key(
330 &&updated_contact.user.github_login,
331 |contact| &contact.user.github_login,
332 ) {
333 Ok(ix) => this.contacts[ix] = updated_contact,
334 Err(ix) => this.contacts.insert(ix, updated_contact),
335 }
336 }
337
338 // Remove incoming contact requests
339 this.incoming_contact_requests.retain(|user| {
340 if removed_incoming_requests.contains(&user.id) {
341 cx.emit(Event::Contact {
342 user: user.clone(),
343 kind: ContactEventKind::Cancelled,
344 });
345 false
346 } else {
347 true
348 }
349 });
350 // Update existing incoming requests and insert new ones
351 for (user, should_notify) in incoming_requests {
352 if should_notify {
353 cx.emit(Event::Contact {
354 user: user.clone(),
355 kind: ContactEventKind::Requested,
356 });
357 }
358
359 match this
360 .incoming_contact_requests
361 .binary_search_by_key(&&user.github_login, |contact| {
362 &contact.github_login
363 }) {
364 Ok(ix) => this.incoming_contact_requests[ix] = user,
365 Err(ix) => this.incoming_contact_requests.insert(ix, user),
366 }
367 }
368
369 // Remove outgoing contact requests
370 this.outgoing_contact_requests
371 .retain(|user| !removed_outgoing_requests.contains(&user.id));
372 // Update existing incoming requests and insert new ones
373 for request in outgoing_requests {
374 match this
375 .outgoing_contact_requests
376 .binary_search_by_key(&&request.github_login, |contact| {
377 &contact.github_login
378 }) {
379 Ok(ix) => this.outgoing_contact_requests[ix] = request,
380 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
381 }
382 }
383
384 cx.notify();
385 });
386
387 Ok(())
388 })
389 }
390 }
391 }
392
393 pub fn contacts(&self) -> &[Arc<Contact>] {
394 &self.contacts
395 }
396
397 pub fn has_contact(&self, user: &Arc<User>) -> bool {
398 self.contacts
399 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
400 .is_ok()
401 }
402
403 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
404 &self.incoming_contact_requests
405 }
406
407 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
408 &self.outgoing_contact_requests
409 }
410
411 pub fn is_contact_request_pending(&self, user: &User) -> bool {
412 self.pending_contact_requests.contains_key(&user.id)
413 }
414
415 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
416 if self
417 .contacts
418 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
419 .is_ok()
420 {
421 ContactRequestStatus::RequestAccepted
422 } else if self
423 .outgoing_contact_requests
424 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
425 .is_ok()
426 {
427 ContactRequestStatus::RequestSent
428 } else if self
429 .incoming_contact_requests
430 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
431 .is_ok()
432 {
433 ContactRequestStatus::RequestReceived
434 } else {
435 ContactRequestStatus::None
436 }
437 }
438
439 pub fn request_contact(
440 &mut self,
441 responder_id: u64,
442 cx: &mut ModelContext<Self>,
443 ) -> Task<Result<()>> {
444 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
445 }
446
447 pub fn remove_contact(
448 &mut self,
449 user_id: u64,
450 cx: &mut ModelContext<Self>,
451 ) -> Task<Result<()>> {
452 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
453 }
454
455 pub fn respond_to_contact_request(
456 &mut self,
457 requester_id: u64,
458 accept: bool,
459 cx: &mut ModelContext<Self>,
460 ) -> Task<Result<()>> {
461 self.perform_contact_request(
462 requester_id,
463 proto::RespondToContactRequest {
464 requester_id,
465 response: if accept {
466 proto::ContactRequestResponse::Accept
467 } else {
468 proto::ContactRequestResponse::Decline
469 } as i32,
470 },
471 cx,
472 )
473 }
474
475 pub fn dismiss_contact_request(
476 &mut self,
477 requester_id: u64,
478 cx: &mut ModelContext<Self>,
479 ) -> Task<Result<()>> {
480 let client = self.client.upgrade();
481 cx.spawn_weak(|_, _| async move {
482 client
483 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
484 .request(proto::RespondToContactRequest {
485 requester_id,
486 response: proto::ContactRequestResponse::Dismiss as i32,
487 })
488 .await?;
489 Ok(())
490 })
491 }
492
493 fn perform_contact_request<T: RequestMessage>(
494 &mut self,
495 user_id: u64,
496 request: T,
497 cx: &mut ModelContext<Self>,
498 ) -> Task<Result<()>> {
499 let client = self.client.upgrade();
500 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
501 cx.notify();
502
503 cx.spawn(|this, mut cx| async move {
504 let response = client
505 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
506 .request(request)
507 .await;
508 this.update(&mut cx, |this, cx| {
509 if let Entry::Occupied(mut request_count) =
510 this.pending_contact_requests.entry(user_id)
511 {
512 *request_count.get_mut() -= 1;
513 if *request_count.get() == 0 {
514 request_count.remove();
515 }
516 }
517 cx.notify();
518 });
519 response?;
520 Ok(())
521 })
522 }
523
524 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
525 let (tx, mut rx) = postage::barrier::channel();
526 self.update_contacts_tx
527 .unbounded_send(UpdateContacts::Clear(tx))
528 .unwrap();
529 async move {
530 rx.next().await;
531 }
532 }
533
534 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
535 let (tx, mut rx) = postage::barrier::channel();
536 self.update_contacts_tx
537 .unbounded_send(UpdateContacts::Wait(tx))
538 .unwrap();
539 async move {
540 rx.next().await;
541 }
542 }
543
544 pub fn get_users(
545 &mut self,
546 user_ids: Vec<u64>,
547 cx: &mut ModelContext<Self>,
548 ) -> Task<Result<Vec<Arc<User>>>> {
549 let mut user_ids_to_fetch = user_ids.clone();
550 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
551
552 cx.spawn(|this, mut cx| async move {
553 if !user_ids_to_fetch.is_empty() {
554 this.update(&mut cx, |this, cx| {
555 this.load_users(
556 proto::GetUsers {
557 user_ids: user_ids_to_fetch,
558 },
559 cx,
560 )
561 })
562 .await?;
563 }
564
565 this.read_with(&cx, |this, _| {
566 user_ids
567 .iter()
568 .map(|user_id| {
569 this.users
570 .get(user_id)
571 .cloned()
572 .ok_or_else(|| anyhow!("user {} not found", user_id))
573 })
574 .collect()
575 })
576 })
577 }
578
579 pub fn fuzzy_search_users(
580 &mut self,
581 query: String,
582 cx: &mut ModelContext<Self>,
583 ) -> Task<Result<Vec<Arc<User>>>> {
584 self.load_users(proto::FuzzySearchUsers { query }, cx)
585 }
586
587 pub fn get_user(
588 &mut self,
589 user_id: u64,
590 cx: &mut ModelContext<Self>,
591 ) -> Task<Result<Arc<User>>> {
592 if let Some(user) = self.users.get(&user_id).cloned() {
593 return cx.foreground().spawn(async move { Ok(user) });
594 }
595
596 let load_users = self.get_users(vec![user_id], cx);
597 cx.spawn(|this, mut cx| async move {
598 load_users.await?;
599 this.update(&mut cx, |this, _| {
600 this.users
601 .get(&user_id)
602 .cloned()
603 .ok_or_else(|| anyhow!("server responded with no users"))
604 })
605 })
606 }
607
608 pub fn current_user(&self) -> Option<Arc<User>> {
609 self.current_user.borrow().clone()
610 }
611
612 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
613 self.current_user.clone()
614 }
615
616 fn load_users(
617 &mut self,
618 request: impl RequestMessage<Response = UsersResponse>,
619 cx: &mut ModelContext<Self>,
620 ) -> Task<Result<Vec<Arc<User>>>> {
621 let client = self.client.clone();
622 let http = self.http.clone();
623 cx.spawn_weak(|this, mut cx| async move {
624 if let Some(rpc) = client.upgrade() {
625 let response = rpc.request(request).await.context("error loading users")?;
626 let users = future::join_all(
627 response
628 .users
629 .into_iter()
630 .map(|user| User::new(user, http.as_ref())),
631 )
632 .await;
633
634 if let Some(this) = this.upgrade(&cx) {
635 this.update(&mut cx, |this, _| {
636 for user in &users {
637 this.users.insert(user.id, user.clone());
638 }
639 });
640 }
641 Ok(users)
642 } else {
643 Ok(Vec::new())
644 }
645 })
646 }
647}
648
649impl User {
650 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
651 Arc::new(User {
652 id: message.id,
653 github_login: message.github_login,
654 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
655 })
656 }
657}
658
659impl Contact {
660 async fn from_proto(
661 contact: proto::Contact,
662 user_store: &ModelHandle<UserStore>,
663 cx: &mut AsyncAppContext,
664 ) -> Result<Self> {
665 let user = user_store
666 .update(cx, |user_store, cx| {
667 user_store.get_user(contact.user_id, cx)
668 })
669 .await?;
670 Ok(Self {
671 user,
672 online: contact.online,
673 busy: contact.busy,
674 })
675 }
676}
677
678async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
679 let mut response = http
680 .get(url, Default::default(), true)
681 .await
682 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
683
684 if !response.status().is_success() {
685 return Err(anyhow!("avatar request failed {:?}", response.status()));
686 }
687
688 let mut body = Vec::new();
689 response
690 .body_mut()
691 .read_to_end(&mut body)
692 .await
693 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
694 let format = image::guess_format(&body)?;
695 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
696 Ok(ImageData::new(image))
697}