1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use staff_mode::StaffMode;
9use std::sync::{Arc, Weak};
10use util::http::HttpClient;
11use util::TryFutureExt as _;
12
13#[derive(Default, Debug)]
14pub struct User {
15 pub id: u64,
16 pub github_login: String,
17 pub avatar: Option<Arc<ImageData>>,
18}
19
20impl PartialOrd for User {
21 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
22 Some(self.cmp(other))
23 }
24}
25
26impl Ord for User {
27 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
28 self.github_login.cmp(&other.github_login)
29 }
30}
31
32impl PartialEq for User {
33 fn eq(&self, other: &Self) -> bool {
34 self.id == other.id && self.github_login == other.github_login
35 }
36}
37
38impl Eq for User {}
39
40#[derive(Debug, PartialEq)]
41pub struct Contact {
42 pub user: Arc<User>,
43 pub online: bool,
44 pub busy: bool,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ContactRequestStatus {
49 None,
50 RequestSent,
51 RequestReceived,
52 RequestAccepted,
53}
54
55pub struct UserStore {
56 users: HashMap<u64, Arc<User>>,
57 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
58 current_user: watch::Receiver<Option<Arc<User>>>,
59 contacts: Vec<Arc<Contact>>,
60 incoming_contact_requests: Vec<Arc<User>>,
61 outgoing_contact_requests: Vec<Arc<User>>,
62 pending_contact_requests: HashMap<u64, usize>,
63 invite_info: Option<InviteInfo>,
64 client: Weak<Client>,
65 http: Arc<dyn HttpClient>,
66 _maintain_contacts: Task<()>,
67 _maintain_current_user: Task<()>,
68}
69
70#[derive(Clone)]
71pub struct InviteInfo {
72 pub count: u32,
73 pub url: Arc<str>,
74}
75
76pub enum Event {
77 Contact {
78 user: Arc<User>,
79 kind: ContactEventKind,
80 },
81 ShowContacts,
82}
83
84#[derive(Clone, Copy)]
85pub enum ContactEventKind {
86 Requested,
87 Accepted,
88 Cancelled,
89}
90
91impl Entity for UserStore {
92 type Event = Event;
93}
94
95enum UpdateContacts {
96 Update(proto::UpdateContacts),
97 Wait(postage::barrier::Sender),
98 Clear(postage::barrier::Sender),
99}
100
101impl UserStore {
102 pub fn new(
103 client: Arc<Client>,
104 http: Arc<dyn HttpClient>,
105 cx: &mut ModelContext<Self>,
106 ) -> Self {
107 let (mut current_user_tx, current_user_rx) = watch::channel();
108 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
109 let rpc_subscriptions = vec![
110 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
111 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
112 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
113 ];
114 Self {
115 users: Default::default(),
116 current_user: current_user_rx,
117 contacts: Default::default(),
118 incoming_contact_requests: Default::default(),
119 outgoing_contact_requests: Default::default(),
120 invite_info: None,
121 client: Arc::downgrade(&client),
122 update_contacts_tx,
123 http,
124 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
125 let _subscriptions = rpc_subscriptions;
126 while let Some(message) = update_contacts_rx.next().await {
127 if let Some(this) = this.upgrade(&cx) {
128 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
129 .log_err()
130 .await;
131 }
132 }
133 }),
134 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
135 let mut status = client.status();
136 while let Some(status) = status.next().await {
137 match status {
138 Status::Connected { .. } => {
139 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
140 let fetch_user = this
141 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
142 .log_err();
143 let fetch_metrics_id =
144 client.request(proto::GetPrivateUserInfo {}).log_err();
145 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
146 cx.read(|cx| {
147 client.telemetry.set_authenticated_user_info(
148 info.as_ref().map(|info| info.metrics_id.clone()),
149 info.as_ref().map(|info| info.staff).unwrap_or(false),
150 cx,
151 )
152 });
153
154 cx.update(|cx| {
155 cx.update_default_global(|staff_mode: &mut StaffMode, _| {
156 if !staff_mode.0 {
157 *staff_mode = StaffMode(
158 info.as_ref()
159 .map(|info| info.staff)
160 .unwrap_or_default(),
161 )
162 }
163 ()
164 });
165 });
166
167 current_user_tx.send(user).await.ok();
168 }
169 }
170 Status::SignedOut => {
171 current_user_tx.send(None).await.ok();
172 if let Some(this) = this.upgrade(&cx) {
173 this.update(&mut cx, |this, _| this.clear_contacts()).await;
174 }
175 }
176 Status::ConnectionLost => {
177 if let Some(this) = this.upgrade(&cx) {
178 this.update(&mut cx, |this, _| this.clear_contacts()).await;
179 }
180 }
181 _ => {}
182 }
183 }
184 }),
185 pending_contact_requests: Default::default(),
186 }
187 }
188
189 #[cfg(feature = "test-support")]
190 pub fn clear_cache(&mut self) {
191 self.users.clear();
192 }
193
194 async fn handle_update_invite_info(
195 this: ModelHandle<Self>,
196 message: TypedEnvelope<proto::UpdateInviteInfo>,
197 _: Arc<Client>,
198 mut cx: AsyncAppContext,
199 ) -> Result<()> {
200 this.update(&mut cx, |this, cx| {
201 this.invite_info = Some(InviteInfo {
202 url: Arc::from(message.payload.url),
203 count: message.payload.count,
204 });
205 cx.notify();
206 });
207 Ok(())
208 }
209
210 async fn handle_show_contacts(
211 this: ModelHandle<Self>,
212 _: TypedEnvelope<proto::ShowContacts>,
213 _: Arc<Client>,
214 mut cx: AsyncAppContext,
215 ) -> Result<()> {
216 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
217 Ok(())
218 }
219
220 pub fn invite_info(&self) -> Option<&InviteInfo> {
221 self.invite_info.as_ref()
222 }
223
224 async fn handle_update_contacts(
225 this: ModelHandle<Self>,
226 message: TypedEnvelope<proto::UpdateContacts>,
227 _: Arc<Client>,
228 mut cx: AsyncAppContext,
229 ) -> Result<()> {
230 this.update(&mut cx, |this, _| {
231 this.update_contacts_tx
232 .unbounded_send(UpdateContacts::Update(message.payload))
233 .unwrap();
234 });
235 Ok(())
236 }
237
238 fn update_contacts(
239 &mut self,
240 message: UpdateContacts,
241 cx: &mut ModelContext<Self>,
242 ) -> Task<Result<()>> {
243 match message {
244 UpdateContacts::Wait(barrier) => {
245 drop(barrier);
246 Task::ready(Ok(()))
247 }
248 UpdateContacts::Clear(barrier) => {
249 self.contacts.clear();
250 self.incoming_contact_requests.clear();
251 self.outgoing_contact_requests.clear();
252 drop(barrier);
253 Task::ready(Ok(()))
254 }
255 UpdateContacts::Update(message) => {
256 let mut user_ids = HashSet::default();
257 for contact in &message.contacts {
258 user_ids.insert(contact.user_id);
259 }
260 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
261 user_ids.extend(message.outgoing_requests.iter());
262
263 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
264 cx.spawn(|this, mut cx| async move {
265 load_users.await?;
266
267 // Users are fetched in parallel above and cached in call to get_users
268 // No need to paralellize here
269 let mut updated_contacts = Vec::new();
270 for contact in message.contacts {
271 let should_notify = contact.should_notify;
272 updated_contacts.push((
273 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
274 should_notify,
275 ));
276 }
277
278 let mut incoming_requests = Vec::new();
279 for request in message.incoming_requests {
280 incoming_requests.push({
281 let user = this
282 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
283 .await?;
284 (user, request.should_notify)
285 });
286 }
287
288 let mut outgoing_requests = Vec::new();
289 for requested_user_id in message.outgoing_requests {
290 outgoing_requests.push(
291 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
292 .await?,
293 );
294 }
295
296 let removed_contacts =
297 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
298 let removed_incoming_requests =
299 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
300 let removed_outgoing_requests =
301 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
302
303 this.update(&mut cx, |this, cx| {
304 // Remove contacts
305 this.contacts
306 .retain(|contact| !removed_contacts.contains(&contact.user.id));
307 // Update existing contacts and insert new ones
308 for (updated_contact, should_notify) in updated_contacts {
309 if should_notify {
310 cx.emit(Event::Contact {
311 user: updated_contact.user.clone(),
312 kind: ContactEventKind::Accepted,
313 });
314 }
315 match this.contacts.binary_search_by_key(
316 &&updated_contact.user.github_login,
317 |contact| &contact.user.github_login,
318 ) {
319 Ok(ix) => this.contacts[ix] = updated_contact,
320 Err(ix) => this.contacts.insert(ix, updated_contact),
321 }
322 }
323
324 // Remove incoming contact requests
325 this.incoming_contact_requests.retain(|user| {
326 if removed_incoming_requests.contains(&user.id) {
327 cx.emit(Event::Contact {
328 user: user.clone(),
329 kind: ContactEventKind::Cancelled,
330 });
331 false
332 } else {
333 true
334 }
335 });
336 // Update existing incoming requests and insert new ones
337 for (user, should_notify) in incoming_requests {
338 if should_notify {
339 cx.emit(Event::Contact {
340 user: user.clone(),
341 kind: ContactEventKind::Requested,
342 });
343 }
344
345 match this
346 .incoming_contact_requests
347 .binary_search_by_key(&&user.github_login, |contact| {
348 &contact.github_login
349 }) {
350 Ok(ix) => this.incoming_contact_requests[ix] = user,
351 Err(ix) => this.incoming_contact_requests.insert(ix, user),
352 }
353 }
354
355 // Remove outgoing contact requests
356 this.outgoing_contact_requests
357 .retain(|user| !removed_outgoing_requests.contains(&user.id));
358 // Update existing incoming requests and insert new ones
359 for request in outgoing_requests {
360 match this
361 .outgoing_contact_requests
362 .binary_search_by_key(&&request.github_login, |contact| {
363 &contact.github_login
364 }) {
365 Ok(ix) => this.outgoing_contact_requests[ix] = request,
366 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
367 }
368 }
369
370 cx.notify();
371 });
372
373 Ok(())
374 })
375 }
376 }
377 }
378
379 pub fn contacts(&self) -> &[Arc<Contact>] {
380 &self.contacts
381 }
382
383 pub fn has_contact(&self, user: &Arc<User>) -> bool {
384 self.contacts
385 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
386 .is_ok()
387 }
388
389 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
390 &self.incoming_contact_requests
391 }
392
393 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
394 &self.outgoing_contact_requests
395 }
396
397 pub fn is_contact_request_pending(&self, user: &User) -> bool {
398 self.pending_contact_requests.contains_key(&user.id)
399 }
400
401 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
402 if self
403 .contacts
404 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
405 .is_ok()
406 {
407 ContactRequestStatus::RequestAccepted
408 } else if self
409 .outgoing_contact_requests
410 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
411 .is_ok()
412 {
413 ContactRequestStatus::RequestSent
414 } else if self
415 .incoming_contact_requests
416 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
417 .is_ok()
418 {
419 ContactRequestStatus::RequestReceived
420 } else {
421 ContactRequestStatus::None
422 }
423 }
424
425 pub fn request_contact(
426 &mut self,
427 responder_id: u64,
428 cx: &mut ModelContext<Self>,
429 ) -> Task<Result<()>> {
430 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
431 }
432
433 pub fn remove_contact(
434 &mut self,
435 user_id: u64,
436 cx: &mut ModelContext<Self>,
437 ) -> Task<Result<()>> {
438 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
439 }
440
441 pub fn respond_to_contact_request(
442 &mut self,
443 requester_id: u64,
444 accept: bool,
445 cx: &mut ModelContext<Self>,
446 ) -> Task<Result<()>> {
447 self.perform_contact_request(
448 requester_id,
449 proto::RespondToContactRequest {
450 requester_id,
451 response: if accept {
452 proto::ContactRequestResponse::Accept
453 } else {
454 proto::ContactRequestResponse::Decline
455 } as i32,
456 },
457 cx,
458 )
459 }
460
461 pub fn dismiss_contact_request(
462 &mut self,
463 requester_id: u64,
464 cx: &mut ModelContext<Self>,
465 ) -> Task<Result<()>> {
466 let client = self.client.upgrade();
467 cx.spawn_weak(|_, _| async move {
468 client
469 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
470 .request(proto::RespondToContactRequest {
471 requester_id,
472 response: proto::ContactRequestResponse::Dismiss as i32,
473 })
474 .await?;
475 Ok(())
476 })
477 }
478
479 fn perform_contact_request<T: RequestMessage>(
480 &mut self,
481 user_id: u64,
482 request: T,
483 cx: &mut ModelContext<Self>,
484 ) -> Task<Result<()>> {
485 let client = self.client.upgrade();
486 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
487 cx.notify();
488
489 cx.spawn(|this, mut cx| async move {
490 let response = client
491 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
492 .request(request)
493 .await;
494 this.update(&mut cx, |this, cx| {
495 if let Entry::Occupied(mut request_count) =
496 this.pending_contact_requests.entry(user_id)
497 {
498 *request_count.get_mut() -= 1;
499 if *request_count.get() == 0 {
500 request_count.remove();
501 }
502 }
503 cx.notify();
504 });
505 response?;
506 Ok(())
507 })
508 }
509
510 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
511 let (tx, mut rx) = postage::barrier::channel();
512 self.update_contacts_tx
513 .unbounded_send(UpdateContacts::Clear(tx))
514 .unwrap();
515 async move {
516 rx.next().await;
517 }
518 }
519
520 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
521 let (tx, mut rx) = postage::barrier::channel();
522 self.update_contacts_tx
523 .unbounded_send(UpdateContacts::Wait(tx))
524 .unwrap();
525 async move {
526 rx.next().await;
527 }
528 }
529
530 pub fn get_users(
531 &mut self,
532 user_ids: Vec<u64>,
533 cx: &mut ModelContext<Self>,
534 ) -> Task<Result<Vec<Arc<User>>>> {
535 let mut user_ids_to_fetch = user_ids.clone();
536 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
537
538 cx.spawn(|this, mut cx| async move {
539 if !user_ids_to_fetch.is_empty() {
540 this.update(&mut cx, |this, cx| {
541 this.load_users(
542 proto::GetUsers {
543 user_ids: user_ids_to_fetch,
544 },
545 cx,
546 )
547 })
548 .await?;
549 }
550
551 this.read_with(&cx, |this, _| {
552 user_ids
553 .iter()
554 .map(|user_id| {
555 this.users
556 .get(user_id)
557 .cloned()
558 .ok_or_else(|| anyhow!("user {} not found", user_id))
559 })
560 .collect()
561 })
562 })
563 }
564
565 pub fn fuzzy_search_users(
566 &mut self,
567 query: String,
568 cx: &mut ModelContext<Self>,
569 ) -> Task<Result<Vec<Arc<User>>>> {
570 self.load_users(proto::FuzzySearchUsers { query }, cx)
571 }
572
573 pub fn get_user(
574 &mut self,
575 user_id: u64,
576 cx: &mut ModelContext<Self>,
577 ) -> Task<Result<Arc<User>>> {
578 if let Some(user) = self.users.get(&user_id).cloned() {
579 return cx.foreground().spawn(async move { Ok(user) });
580 }
581
582 let load_users = self.get_users(vec![user_id], cx);
583 cx.spawn(|this, mut cx| async move {
584 load_users.await?;
585 this.update(&mut cx, |this, _| {
586 this.users
587 .get(&user_id)
588 .cloned()
589 .ok_or_else(|| anyhow!("server responded with no users"))
590 })
591 })
592 }
593
594 pub fn current_user(&self) -> Option<Arc<User>> {
595 self.current_user.borrow().clone()
596 }
597
598 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
599 self.current_user.clone()
600 }
601
602 fn load_users(
603 &mut self,
604 request: impl RequestMessage<Response = UsersResponse>,
605 cx: &mut ModelContext<Self>,
606 ) -> Task<Result<Vec<Arc<User>>>> {
607 let client = self.client.clone();
608 let http = self.http.clone();
609 cx.spawn_weak(|this, mut cx| async move {
610 if let Some(rpc) = client.upgrade() {
611 let response = rpc.request(request).await.context("error loading users")?;
612 let users = future::join_all(
613 response
614 .users
615 .into_iter()
616 .map(|user| User::new(user, http.as_ref())),
617 )
618 .await;
619
620 if let Some(this) = this.upgrade(&cx) {
621 this.update(&mut cx, |this, _| {
622 for user in &users {
623 this.users.insert(user.id, user.clone());
624 }
625 });
626 }
627 Ok(users)
628 } else {
629 Ok(Vec::new())
630 }
631 })
632 }
633}
634
635impl User {
636 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
637 Arc::new(User {
638 id: message.id,
639 github_login: message.github_login,
640 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
641 })
642 }
643}
644
645impl Contact {
646 async fn from_proto(
647 contact: proto::Contact,
648 user_store: &ModelHandle<UserStore>,
649 cx: &mut AsyncAppContext,
650 ) -> Result<Self> {
651 let user = user_store
652 .update(cx, |user_store, cx| {
653 user_store.get_user(contact.user_id, cx)
654 })
655 .await?;
656 Ok(Self {
657 user,
658 online: contact.online,
659 busy: contact.busy,
660 })
661 }
662}
663
664async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
665 let mut response = http
666 .get(url, Default::default(), true)
667 .await
668 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
669
670 if !response.status().is_success() {
671 return Err(anyhow!("avatar request failed {:?}", response.status()));
672 }
673
674 let mut body = Vec::new();
675 response
676 .body_mut()
677 .read_to_end(&mut body)
678 .await
679 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
680 let format = image::guess_format(&body)?;
681 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
682 Ok(ImageData::new(image))
683}