1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use settings::Settings;
9use std::sync::{Arc, Weak};
10use util::{paths::StaffMode, TryFutureExt as _};
11
12#[derive(Default, Debug)]
13pub struct User {
14 pub id: u64,
15 pub github_login: String,
16 pub avatar: Option<Arc<ImageData>>,
17}
18
19impl PartialOrd for User {
20 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
21 Some(self.cmp(other))
22 }
23}
24
25impl Ord for User {
26 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
27 self.github_login.cmp(&other.github_login)
28 }
29}
30
31impl PartialEq for User {
32 fn eq(&self, other: &Self) -> bool {
33 self.id == other.id && self.github_login == other.github_login
34 }
35}
36
37impl Eq for User {}
38
39#[derive(Debug, PartialEq)]
40pub struct Contact {
41 pub user: Arc<User>,
42 pub online: bool,
43 pub busy: bool,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum ContactRequestStatus {
48 None,
49 RequestSent,
50 RequestReceived,
51 RequestAccepted,
52}
53
54pub struct UserStore {
55 users: HashMap<u64, Arc<User>>,
56 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
57 current_user: watch::Receiver<Option<Arc<User>>>,
58 contacts: Vec<Arc<Contact>>,
59 incoming_contact_requests: Vec<Arc<User>>,
60 outgoing_contact_requests: Vec<Arc<User>>,
61 pending_contact_requests: HashMap<u64, usize>,
62 invite_info: Option<InviteInfo>,
63 client: Weak<Client>,
64 http: Arc<dyn HttpClient>,
65 _maintain_contacts: Task<()>,
66 _maintain_current_user: Task<()>,
67}
68
69#[derive(Clone)]
70pub struct InviteInfo {
71 pub count: u32,
72 pub url: Arc<str>,
73}
74
75pub enum Event {
76 Contact {
77 user: Arc<User>,
78 kind: ContactEventKind,
79 },
80 ShowContacts,
81}
82
83#[derive(Clone, Copy)]
84pub enum ContactEventKind {
85 Requested,
86 Accepted,
87 Cancelled,
88}
89
90impl Entity for UserStore {
91 type Event = Event;
92}
93
94enum UpdateContacts {
95 Update(proto::UpdateContacts),
96 Wait(postage::barrier::Sender),
97 Clear(postage::barrier::Sender),
98}
99
100impl UserStore {
101 pub fn new(
102 client: Arc<Client>,
103 http: Arc<dyn HttpClient>,
104 cx: &mut ModelContext<Self>,
105 ) -> Self {
106 let (mut current_user_tx, current_user_rx) = watch::channel();
107 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
108 let rpc_subscriptions = vec![
109 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
110 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
111 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
112 ];
113 Self {
114 users: Default::default(),
115 current_user: current_user_rx,
116 contacts: Default::default(),
117 incoming_contact_requests: Default::default(),
118 outgoing_contact_requests: Default::default(),
119 invite_info: None,
120 client: Arc::downgrade(&client),
121 update_contacts_tx,
122 http,
123 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
124 let _subscriptions = rpc_subscriptions;
125 while let Some(message) = update_contacts_rx.next().await {
126 if let Some(this) = this.upgrade(&cx) {
127 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
128 .log_err()
129 .await;
130 }
131 }
132 }),
133 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
134 let mut status = client.status();
135 while let Some(status) = status.next().await {
136 match status {
137 Status::Connected { .. } => {
138 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
139 let fetch_user = this
140 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
141 .log_err();
142 let fetch_metrics_id =
143 client.request(proto::GetPrivateUserInfo {}).log_err();
144 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
145 client.telemetry.set_authenticated_user_info(
146 info.as_ref().map(|info| info.metrics_id.clone()),
147 info.as_ref().map(|info| info.staff).unwrap_or(false),
148 cx.read(|cx| cx.global::<Settings>().telemetry()),
149 );
150
151 cx.update(|cx| {
152 cx.set_global(
153 info.as_ref()
154 .map(|info| StaffMode(info.staff))
155 .unwrap_or(StaffMode(false)),
156 );
157 });
158
159 current_user_tx.send(user).await.ok();
160 }
161 }
162 Status::SignedOut => {
163 current_user_tx.send(None).await.ok();
164 if let Some(this) = this.upgrade(&cx) {
165 this.update(&mut cx, |this, _| this.clear_contacts()).await;
166 }
167 }
168 Status::ConnectionLost => {
169 if let Some(this) = this.upgrade(&cx) {
170 this.update(&mut cx, |this, _| this.clear_contacts()).await;
171 }
172 }
173 _ => {}
174 }
175 }
176 }),
177 pending_contact_requests: Default::default(),
178 }
179 }
180
181 async fn handle_update_invite_info(
182 this: ModelHandle<Self>,
183 message: TypedEnvelope<proto::UpdateInviteInfo>,
184 _: Arc<Client>,
185 mut cx: AsyncAppContext,
186 ) -> Result<()> {
187 this.update(&mut cx, |this, cx| {
188 this.invite_info = Some(InviteInfo {
189 url: Arc::from(message.payload.url),
190 count: message.payload.count,
191 });
192 cx.notify();
193 });
194 Ok(())
195 }
196
197 async fn handle_show_contacts(
198 this: ModelHandle<Self>,
199 _: TypedEnvelope<proto::ShowContacts>,
200 _: Arc<Client>,
201 mut cx: AsyncAppContext,
202 ) -> Result<()> {
203 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
204 Ok(())
205 }
206
207 pub fn invite_info(&self) -> Option<&InviteInfo> {
208 self.invite_info.as_ref()
209 }
210
211 async fn handle_update_contacts(
212 this: ModelHandle<Self>,
213 message: TypedEnvelope<proto::UpdateContacts>,
214 _: Arc<Client>,
215 mut cx: AsyncAppContext,
216 ) -> Result<()> {
217 this.update(&mut cx, |this, _| {
218 this.update_contacts_tx
219 .unbounded_send(UpdateContacts::Update(message.payload))
220 .unwrap();
221 });
222 Ok(())
223 }
224
225 fn update_contacts(
226 &mut self,
227 message: UpdateContacts,
228 cx: &mut ModelContext<Self>,
229 ) -> Task<Result<()>> {
230 match message {
231 UpdateContacts::Wait(barrier) => {
232 drop(barrier);
233 Task::ready(Ok(()))
234 }
235 UpdateContacts::Clear(barrier) => {
236 self.contacts.clear();
237 self.incoming_contact_requests.clear();
238 self.outgoing_contact_requests.clear();
239 drop(barrier);
240 Task::ready(Ok(()))
241 }
242 UpdateContacts::Update(message) => {
243 let mut user_ids = HashSet::default();
244 for contact in &message.contacts {
245 user_ids.insert(contact.user_id);
246 }
247 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
248 user_ids.extend(message.outgoing_requests.iter());
249
250 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
251 cx.spawn(|this, mut cx| async move {
252 load_users.await?;
253
254 // Users are fetched in parallel above and cached in call to get_users
255 // No need to paralellize here
256 let mut updated_contacts = Vec::new();
257 for contact in message.contacts {
258 let should_notify = contact.should_notify;
259 updated_contacts.push((
260 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
261 should_notify,
262 ));
263 }
264
265 let mut incoming_requests = Vec::new();
266 for request in message.incoming_requests {
267 incoming_requests.push({
268 let user = this
269 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
270 .await?;
271 (user, request.should_notify)
272 });
273 }
274
275 let mut outgoing_requests = Vec::new();
276 for requested_user_id in message.outgoing_requests {
277 outgoing_requests.push(
278 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
279 .await?,
280 );
281 }
282
283 let removed_contacts =
284 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
285 let removed_incoming_requests =
286 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
287 let removed_outgoing_requests =
288 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
289
290 this.update(&mut cx, |this, cx| {
291 // Remove contacts
292 this.contacts
293 .retain(|contact| !removed_contacts.contains(&contact.user.id));
294 // Update existing contacts and insert new ones
295 for (updated_contact, should_notify) in updated_contacts {
296 if should_notify {
297 cx.emit(Event::Contact {
298 user: updated_contact.user.clone(),
299 kind: ContactEventKind::Accepted,
300 });
301 }
302 match this.contacts.binary_search_by_key(
303 &&updated_contact.user.github_login,
304 |contact| &contact.user.github_login,
305 ) {
306 Ok(ix) => this.contacts[ix] = updated_contact,
307 Err(ix) => this.contacts.insert(ix, updated_contact),
308 }
309 }
310
311 // Remove incoming contact requests
312 this.incoming_contact_requests.retain(|user| {
313 if removed_incoming_requests.contains(&user.id) {
314 cx.emit(Event::Contact {
315 user: user.clone(),
316 kind: ContactEventKind::Cancelled,
317 });
318 false
319 } else {
320 true
321 }
322 });
323 // Update existing incoming requests and insert new ones
324 for (user, should_notify) in incoming_requests {
325 if should_notify {
326 cx.emit(Event::Contact {
327 user: user.clone(),
328 kind: ContactEventKind::Requested,
329 });
330 }
331
332 match this
333 .incoming_contact_requests
334 .binary_search_by_key(&&user.github_login, |contact| {
335 &contact.github_login
336 }) {
337 Ok(ix) => this.incoming_contact_requests[ix] = user,
338 Err(ix) => this.incoming_contact_requests.insert(ix, user),
339 }
340 }
341
342 // Remove outgoing contact requests
343 this.outgoing_contact_requests
344 .retain(|user| !removed_outgoing_requests.contains(&user.id));
345 // Update existing incoming requests and insert new ones
346 for request in outgoing_requests {
347 match this
348 .outgoing_contact_requests
349 .binary_search_by_key(&&request.github_login, |contact| {
350 &contact.github_login
351 }) {
352 Ok(ix) => this.outgoing_contact_requests[ix] = request,
353 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
354 }
355 }
356
357 cx.notify();
358 });
359
360 Ok(())
361 })
362 }
363 }
364 }
365
366 pub fn contacts(&self) -> &[Arc<Contact>] {
367 &self.contacts
368 }
369
370 pub fn has_contact(&self, user: &Arc<User>) -> bool {
371 self.contacts
372 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
373 .is_ok()
374 }
375
376 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
377 &self.incoming_contact_requests
378 }
379
380 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
381 &self.outgoing_contact_requests
382 }
383
384 pub fn is_contact_request_pending(&self, user: &User) -> bool {
385 self.pending_contact_requests.contains_key(&user.id)
386 }
387
388 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
389 if self
390 .contacts
391 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
392 .is_ok()
393 {
394 ContactRequestStatus::RequestAccepted
395 } else if self
396 .outgoing_contact_requests
397 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
398 .is_ok()
399 {
400 ContactRequestStatus::RequestSent
401 } else if self
402 .incoming_contact_requests
403 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
404 .is_ok()
405 {
406 ContactRequestStatus::RequestReceived
407 } else {
408 ContactRequestStatus::None
409 }
410 }
411
412 pub fn request_contact(
413 &mut self,
414 responder_id: u64,
415 cx: &mut ModelContext<Self>,
416 ) -> Task<Result<()>> {
417 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
418 }
419
420 pub fn remove_contact(
421 &mut self,
422 user_id: u64,
423 cx: &mut ModelContext<Self>,
424 ) -> Task<Result<()>> {
425 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
426 }
427
428 pub fn respond_to_contact_request(
429 &mut self,
430 requester_id: u64,
431 accept: bool,
432 cx: &mut ModelContext<Self>,
433 ) -> Task<Result<()>> {
434 self.perform_contact_request(
435 requester_id,
436 proto::RespondToContactRequest {
437 requester_id,
438 response: if accept {
439 proto::ContactRequestResponse::Accept
440 } else {
441 proto::ContactRequestResponse::Decline
442 } as i32,
443 },
444 cx,
445 )
446 }
447
448 pub fn dismiss_contact_request(
449 &mut self,
450 requester_id: u64,
451 cx: &mut ModelContext<Self>,
452 ) -> Task<Result<()>> {
453 let client = self.client.upgrade();
454 cx.spawn_weak(|_, _| async move {
455 client
456 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
457 .request(proto::RespondToContactRequest {
458 requester_id,
459 response: proto::ContactRequestResponse::Dismiss as i32,
460 })
461 .await?;
462 Ok(())
463 })
464 }
465
466 fn perform_contact_request<T: RequestMessage>(
467 &mut self,
468 user_id: u64,
469 request: T,
470 cx: &mut ModelContext<Self>,
471 ) -> Task<Result<()>> {
472 let client = self.client.upgrade();
473 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
474 cx.notify();
475
476 cx.spawn(|this, mut cx| async move {
477 let response = client
478 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
479 .request(request)
480 .await;
481 this.update(&mut cx, |this, cx| {
482 if let Entry::Occupied(mut request_count) =
483 this.pending_contact_requests.entry(user_id)
484 {
485 *request_count.get_mut() -= 1;
486 if *request_count.get() == 0 {
487 request_count.remove();
488 }
489 }
490 cx.notify();
491 });
492 response?;
493 Ok(())
494 })
495 }
496
497 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
498 let (tx, mut rx) = postage::barrier::channel();
499 self.update_contacts_tx
500 .unbounded_send(UpdateContacts::Clear(tx))
501 .unwrap();
502 async move {
503 rx.next().await;
504 }
505 }
506
507 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
508 let (tx, mut rx) = postage::barrier::channel();
509 self.update_contacts_tx
510 .unbounded_send(UpdateContacts::Wait(tx))
511 .unwrap();
512 async move {
513 rx.next().await;
514 }
515 }
516
517 pub fn get_users(
518 &mut self,
519 user_ids: Vec<u64>,
520 cx: &mut ModelContext<Self>,
521 ) -> Task<Result<Vec<Arc<User>>>> {
522 let mut user_ids_to_fetch = user_ids.clone();
523 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
524
525 cx.spawn(|this, mut cx| async move {
526 if !user_ids_to_fetch.is_empty() {
527 this.update(&mut cx, |this, cx| {
528 this.load_users(
529 proto::GetUsers {
530 user_ids: user_ids_to_fetch,
531 },
532 cx,
533 )
534 })
535 .await?;
536 }
537
538 this.read_with(&cx, |this, _| {
539 user_ids
540 .iter()
541 .map(|user_id| {
542 this.users
543 .get(user_id)
544 .cloned()
545 .ok_or_else(|| anyhow!("user {} not found", user_id))
546 })
547 .collect()
548 })
549 })
550 }
551
552 pub fn fuzzy_search_users(
553 &mut self,
554 query: String,
555 cx: &mut ModelContext<Self>,
556 ) -> Task<Result<Vec<Arc<User>>>> {
557 self.load_users(proto::FuzzySearchUsers { query }, cx)
558 }
559
560 pub fn get_user(
561 &mut self,
562 user_id: u64,
563 cx: &mut ModelContext<Self>,
564 ) -> Task<Result<Arc<User>>> {
565 if let Some(user) = self.users.get(&user_id).cloned() {
566 return cx.foreground().spawn(async move { Ok(user) });
567 }
568
569 let load_users = self.get_users(vec![user_id], cx);
570 cx.spawn(|this, mut cx| async move {
571 load_users.await?;
572 this.update(&mut cx, |this, _| {
573 this.users
574 .get(&user_id)
575 .cloned()
576 .ok_or_else(|| anyhow!("server responded with no users"))
577 })
578 })
579 }
580
581 pub fn current_user(&self) -> Option<Arc<User>> {
582 self.current_user.borrow().clone()
583 }
584
585 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
586 self.current_user.clone()
587 }
588
589 fn load_users(
590 &mut self,
591 request: impl RequestMessage<Response = UsersResponse>,
592 cx: &mut ModelContext<Self>,
593 ) -> Task<Result<Vec<Arc<User>>>> {
594 let client = self.client.clone();
595 let http = self.http.clone();
596 cx.spawn_weak(|this, mut cx| async move {
597 if let Some(rpc) = client.upgrade() {
598 let response = rpc.request(request).await.context("error loading users")?;
599 let users = future::join_all(
600 response
601 .users
602 .into_iter()
603 .map(|user| User::new(user, http.as_ref())),
604 )
605 .await;
606
607 if let Some(this) = this.upgrade(&cx) {
608 this.update(&mut cx, |this, _| {
609 for user in &users {
610 this.users.insert(user.id, user.clone());
611 }
612 });
613 }
614 Ok(users)
615 } else {
616 Ok(Vec::new())
617 }
618 })
619 }
620}
621
622impl User {
623 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
624 Arc::new(User {
625 id: message.id,
626 github_login: message.github_login,
627 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
628 })
629 }
630}
631
632impl Contact {
633 async fn from_proto(
634 contact: proto::Contact,
635 user_store: &ModelHandle<UserStore>,
636 cx: &mut AsyncAppContext,
637 ) -> Result<Self> {
638 let user = user_store
639 .update(cx, |user_store, cx| {
640 user_store.get_user(contact.user_id, cx)
641 })
642 .await?;
643 Ok(Self {
644 user,
645 online: contact.online,
646 busy: contact.busy,
647 })
648 }
649}
650
651async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
652 let mut response = http
653 .get(url, Default::default(), true)
654 .await
655 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
656
657 if !response.status().is_success() {
658 return Err(anyhow!("avatar request failed {:?}", response.status()));
659 }
660
661 let mut body = Vec::new();
662 response
663 .body_mut()
664 .read_to_end(&mut body)
665 .await
666 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
667 let format = image::guess_format(&body)?;
668 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
669 Ok(ImageData::new(image))
670}