1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use settings::Settings;
9use std::sync::{Arc, Weak};
10use util::TryFutureExt as _;
11
12#[derive(Default, Debug)]
13pub struct User {
14 pub id: u64,
15 pub github_login: String,
16 pub avatar: Option<Arc<ImageData>>,
17}
18
19impl PartialOrd for User {
20 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
21 Some(self.cmp(other))
22 }
23}
24
25impl Ord for User {
26 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
27 self.github_login.cmp(&other.github_login)
28 }
29}
30
31impl PartialEq for User {
32 fn eq(&self, other: &Self) -> bool {
33 self.id == other.id && self.github_login == other.github_login
34 }
35}
36
37impl Eq for User {}
38
39#[derive(Debug, PartialEq)]
40pub struct Contact {
41 pub user: Arc<User>,
42 pub online: bool,
43 pub busy: bool,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum ContactRequestStatus {
48 None,
49 RequestSent,
50 RequestReceived,
51 RequestAccepted,
52}
53
54pub struct UserStore {
55 users: HashMap<u64, Arc<User>>,
56 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
57 current_user: watch::Receiver<Option<Arc<User>>>,
58 contacts: Vec<Arc<Contact>>,
59 incoming_contact_requests: Vec<Arc<User>>,
60 outgoing_contact_requests: Vec<Arc<User>>,
61 pending_contact_requests: HashMap<u64, usize>,
62 invite_info: Option<InviteInfo>,
63 client: Weak<Client>,
64 http: Arc<dyn HttpClient>,
65 _maintain_contacts: Task<()>,
66 _maintain_current_user: Task<()>,
67}
68
69#[derive(Clone)]
70pub struct InviteInfo {
71 pub count: u32,
72 pub url: Arc<str>,
73}
74
75pub enum Event {
76 Contact {
77 user: Arc<User>,
78 kind: ContactEventKind,
79 },
80 ShowContacts,
81}
82
83#[derive(Clone, Copy)]
84pub enum ContactEventKind {
85 Requested,
86 Accepted,
87 Cancelled,
88}
89
90impl Entity for UserStore {
91 type Event = Event;
92}
93
94enum UpdateContacts {
95 Update(proto::UpdateContacts),
96 Wait(postage::barrier::Sender),
97 Clear(postage::barrier::Sender),
98}
99
100impl UserStore {
101 pub fn new(
102 client: Arc<Client>,
103 http: Arc<dyn HttpClient>,
104 cx: &mut ModelContext<Self>,
105 ) -> Self {
106 let (mut current_user_tx, current_user_rx) = watch::channel();
107 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
108 let rpc_subscriptions = vec![
109 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
110 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
111 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
112 ];
113 Self {
114 users: Default::default(),
115 current_user: current_user_rx,
116 contacts: Default::default(),
117 incoming_contact_requests: Default::default(),
118 outgoing_contact_requests: Default::default(),
119 invite_info: None,
120 client: Arc::downgrade(&client),
121 update_contacts_tx,
122 http,
123 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
124 let _subscriptions = rpc_subscriptions;
125 while let Some(message) = update_contacts_rx.next().await {
126 if let Some(this) = this.upgrade(&cx) {
127 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
128 .log_err()
129 .await;
130 }
131 }
132 }),
133 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
134 let mut status = client.status();
135 while let Some(status) = status.next().await {
136 match status {
137 Status::Connected { .. } => {
138 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
139 let fetch_user = this
140 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
141 .log_err();
142 let fetch_metrics_id =
143 client.request(proto::GetPrivateUserInfo {}).log_err();
144 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
145 client.telemetry.set_authenticated_user_info(
146 info.as_ref().map(|info| info.metrics_id.clone()),
147 info.as_ref().map(|info| info.staff).unwrap_or(false),
148 cx.read(|cx| cx.global::<Settings>().telemetry()),
149 );
150
151 current_user_tx.send(user).await.ok();
152 }
153 }
154 Status::SignedOut => {
155 current_user_tx.send(None).await.ok();
156 if let Some(this) = this.upgrade(&cx) {
157 this.update(&mut cx, |this, _| this.clear_contacts()).await;
158 }
159 }
160 Status::ConnectionLost => {
161 if let Some(this) = this.upgrade(&cx) {
162 this.update(&mut cx, |this, _| this.clear_contacts()).await;
163 }
164 }
165 _ => {}
166 }
167 }
168 }),
169 pending_contact_requests: Default::default(),
170 }
171 }
172
173 async fn handle_update_invite_info(
174 this: ModelHandle<Self>,
175 message: TypedEnvelope<proto::UpdateInviteInfo>,
176 _: Arc<Client>,
177 mut cx: AsyncAppContext,
178 ) -> Result<()> {
179 this.update(&mut cx, |this, cx| {
180 this.invite_info = Some(InviteInfo {
181 url: Arc::from(message.payload.url),
182 count: message.payload.count,
183 });
184 cx.notify();
185 });
186 Ok(())
187 }
188
189 async fn handle_show_contacts(
190 this: ModelHandle<Self>,
191 _: TypedEnvelope<proto::ShowContacts>,
192 _: Arc<Client>,
193 mut cx: AsyncAppContext,
194 ) -> Result<()> {
195 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
196 Ok(())
197 }
198
199 pub fn invite_info(&self) -> Option<&InviteInfo> {
200 self.invite_info.as_ref()
201 }
202
203 async fn handle_update_contacts(
204 this: ModelHandle<Self>,
205 message: TypedEnvelope<proto::UpdateContacts>,
206 _: Arc<Client>,
207 mut cx: AsyncAppContext,
208 ) -> Result<()> {
209 this.update(&mut cx, |this, _| {
210 this.update_contacts_tx
211 .unbounded_send(UpdateContacts::Update(message.payload))
212 .unwrap();
213 });
214 Ok(())
215 }
216
217 fn update_contacts(
218 &mut self,
219 message: UpdateContacts,
220 cx: &mut ModelContext<Self>,
221 ) -> Task<Result<()>> {
222 match message {
223 UpdateContacts::Wait(barrier) => {
224 drop(barrier);
225 Task::ready(Ok(()))
226 }
227 UpdateContacts::Clear(barrier) => {
228 self.contacts.clear();
229 self.incoming_contact_requests.clear();
230 self.outgoing_contact_requests.clear();
231 drop(barrier);
232 Task::ready(Ok(()))
233 }
234 UpdateContacts::Update(message) => {
235 let mut user_ids = HashSet::default();
236 for contact in &message.contacts {
237 user_ids.insert(contact.user_id);
238 }
239 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
240 user_ids.extend(message.outgoing_requests.iter());
241
242 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
243 cx.spawn(|this, mut cx| async move {
244 load_users.await?;
245
246 // Users are fetched in parallel above and cached in call to get_users
247 // No need to paralellize here
248 let mut updated_contacts = Vec::new();
249 for contact in message.contacts {
250 let should_notify = contact.should_notify;
251 updated_contacts.push((
252 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
253 should_notify,
254 ));
255 }
256
257 let mut incoming_requests = Vec::new();
258 for request in message.incoming_requests {
259 incoming_requests.push({
260 let user = this
261 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
262 .await?;
263 (user, request.should_notify)
264 });
265 }
266
267 let mut outgoing_requests = Vec::new();
268 for requested_user_id in message.outgoing_requests {
269 outgoing_requests.push(
270 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
271 .await?,
272 );
273 }
274
275 let removed_contacts =
276 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
277 let removed_incoming_requests =
278 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
279 let removed_outgoing_requests =
280 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
281
282 this.update(&mut cx, |this, cx| {
283 // Remove contacts
284 this.contacts
285 .retain(|contact| !removed_contacts.contains(&contact.user.id));
286 // Update existing contacts and insert new ones
287 for (updated_contact, should_notify) in updated_contacts {
288 if should_notify {
289 cx.emit(Event::Contact {
290 user: updated_contact.user.clone(),
291 kind: ContactEventKind::Accepted,
292 });
293 }
294 match this.contacts.binary_search_by_key(
295 &&updated_contact.user.github_login,
296 |contact| &contact.user.github_login,
297 ) {
298 Ok(ix) => this.contacts[ix] = updated_contact,
299 Err(ix) => this.contacts.insert(ix, updated_contact),
300 }
301 }
302
303 // Remove incoming contact requests
304 this.incoming_contact_requests.retain(|user| {
305 if removed_incoming_requests.contains(&user.id) {
306 cx.emit(Event::Contact {
307 user: user.clone(),
308 kind: ContactEventKind::Cancelled,
309 });
310 false
311 } else {
312 true
313 }
314 });
315 // Update existing incoming requests and insert new ones
316 for (user, should_notify) in incoming_requests {
317 if should_notify {
318 cx.emit(Event::Contact {
319 user: user.clone(),
320 kind: ContactEventKind::Requested,
321 });
322 }
323
324 match this
325 .incoming_contact_requests
326 .binary_search_by_key(&&user.github_login, |contact| {
327 &contact.github_login
328 }) {
329 Ok(ix) => this.incoming_contact_requests[ix] = user,
330 Err(ix) => this.incoming_contact_requests.insert(ix, user),
331 }
332 }
333
334 // Remove outgoing contact requests
335 this.outgoing_contact_requests
336 .retain(|user| !removed_outgoing_requests.contains(&user.id));
337 // Update existing incoming requests and insert new ones
338 for request in outgoing_requests {
339 match this
340 .outgoing_contact_requests
341 .binary_search_by_key(&&request.github_login, |contact| {
342 &contact.github_login
343 }) {
344 Ok(ix) => this.outgoing_contact_requests[ix] = request,
345 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
346 }
347 }
348
349 cx.notify();
350 });
351
352 Ok(())
353 })
354 }
355 }
356 }
357
358 pub fn contacts(&self) -> &[Arc<Contact>] {
359 &self.contacts
360 }
361
362 pub fn has_contact(&self, user: &Arc<User>) -> bool {
363 self.contacts
364 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
365 .is_ok()
366 }
367
368 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
369 &self.incoming_contact_requests
370 }
371
372 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
373 &self.outgoing_contact_requests
374 }
375
376 pub fn is_contact_request_pending(&self, user: &User) -> bool {
377 self.pending_contact_requests.contains_key(&user.id)
378 }
379
380 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
381 if self
382 .contacts
383 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
384 .is_ok()
385 {
386 ContactRequestStatus::RequestAccepted
387 } else if self
388 .outgoing_contact_requests
389 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
390 .is_ok()
391 {
392 ContactRequestStatus::RequestSent
393 } else if self
394 .incoming_contact_requests
395 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
396 .is_ok()
397 {
398 ContactRequestStatus::RequestReceived
399 } else {
400 ContactRequestStatus::None
401 }
402 }
403
404 pub fn request_contact(
405 &mut self,
406 responder_id: u64,
407 cx: &mut ModelContext<Self>,
408 ) -> Task<Result<()>> {
409 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
410 }
411
412 pub fn remove_contact(
413 &mut self,
414 user_id: u64,
415 cx: &mut ModelContext<Self>,
416 ) -> Task<Result<()>> {
417 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
418 }
419
420 pub fn respond_to_contact_request(
421 &mut self,
422 requester_id: u64,
423 accept: bool,
424 cx: &mut ModelContext<Self>,
425 ) -> Task<Result<()>> {
426 self.perform_contact_request(
427 requester_id,
428 proto::RespondToContactRequest {
429 requester_id,
430 response: if accept {
431 proto::ContactRequestResponse::Accept
432 } else {
433 proto::ContactRequestResponse::Decline
434 } as i32,
435 },
436 cx,
437 )
438 }
439
440 pub fn dismiss_contact_request(
441 &mut self,
442 requester_id: u64,
443 cx: &mut ModelContext<Self>,
444 ) -> Task<Result<()>> {
445 let client = self.client.upgrade();
446 cx.spawn_weak(|_, _| async move {
447 client
448 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
449 .request(proto::RespondToContactRequest {
450 requester_id,
451 response: proto::ContactRequestResponse::Dismiss as i32,
452 })
453 .await?;
454 Ok(())
455 })
456 }
457
458 fn perform_contact_request<T: RequestMessage>(
459 &mut self,
460 user_id: u64,
461 request: T,
462 cx: &mut ModelContext<Self>,
463 ) -> Task<Result<()>> {
464 let client = self.client.upgrade();
465 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
466 cx.notify();
467
468 cx.spawn(|this, mut cx| async move {
469 let response = client
470 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
471 .request(request)
472 .await;
473 this.update(&mut cx, |this, cx| {
474 if let Entry::Occupied(mut request_count) =
475 this.pending_contact_requests.entry(user_id)
476 {
477 *request_count.get_mut() -= 1;
478 if *request_count.get() == 0 {
479 request_count.remove();
480 }
481 }
482 cx.notify();
483 });
484 response?;
485 Ok(())
486 })
487 }
488
489 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
490 let (tx, mut rx) = postage::barrier::channel();
491 self.update_contacts_tx
492 .unbounded_send(UpdateContacts::Clear(tx))
493 .unwrap();
494 async move {
495 rx.next().await;
496 }
497 }
498
499 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
500 let (tx, mut rx) = postage::barrier::channel();
501 self.update_contacts_tx
502 .unbounded_send(UpdateContacts::Wait(tx))
503 .unwrap();
504 async move {
505 rx.next().await;
506 }
507 }
508
509 pub fn get_users(
510 &mut self,
511 user_ids: Vec<u64>,
512 cx: &mut ModelContext<Self>,
513 ) -> Task<Result<Vec<Arc<User>>>> {
514 let mut user_ids_to_fetch = user_ids.clone();
515 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
516
517 cx.spawn(|this, mut cx| async move {
518 if !user_ids_to_fetch.is_empty() {
519 this.update(&mut cx, |this, cx| {
520 this.load_users(
521 proto::GetUsers {
522 user_ids: user_ids_to_fetch,
523 },
524 cx,
525 )
526 })
527 .await?;
528 }
529
530 this.read_with(&cx, |this, _| {
531 user_ids
532 .iter()
533 .map(|user_id| {
534 this.users
535 .get(user_id)
536 .cloned()
537 .ok_or_else(|| anyhow!("user {} not found", user_id))
538 })
539 .collect()
540 })
541 })
542 }
543
544 pub fn fuzzy_search_users(
545 &mut self,
546 query: String,
547 cx: &mut ModelContext<Self>,
548 ) -> Task<Result<Vec<Arc<User>>>> {
549 self.load_users(proto::FuzzySearchUsers { query }, cx)
550 }
551
552 pub fn get_user(
553 &mut self,
554 user_id: u64,
555 cx: &mut ModelContext<Self>,
556 ) -> Task<Result<Arc<User>>> {
557 if let Some(user) = self.users.get(&user_id).cloned() {
558 return cx.foreground().spawn(async move { Ok(user) });
559 }
560
561 let load_users = self.get_users(vec![user_id], cx);
562 cx.spawn(|this, mut cx| async move {
563 load_users.await?;
564 this.update(&mut cx, |this, _| {
565 this.users
566 .get(&user_id)
567 .cloned()
568 .ok_or_else(|| anyhow!("server responded with no users"))
569 })
570 })
571 }
572
573 pub fn current_user(&self) -> Option<Arc<User>> {
574 self.current_user.borrow().clone()
575 }
576
577 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
578 self.current_user.clone()
579 }
580
581 fn load_users(
582 &mut self,
583 request: impl RequestMessage<Response = UsersResponse>,
584 cx: &mut ModelContext<Self>,
585 ) -> Task<Result<Vec<Arc<User>>>> {
586 let client = self.client.clone();
587 let http = self.http.clone();
588 cx.spawn_weak(|this, mut cx| async move {
589 if let Some(rpc) = client.upgrade() {
590 let response = rpc.request(request).await.context("error loading users")?;
591 let users = future::join_all(
592 response
593 .users
594 .into_iter()
595 .map(|user| User::new(user, http.as_ref())),
596 )
597 .await;
598
599 if let Some(this) = this.upgrade(&cx) {
600 this.update(&mut cx, |this, _| {
601 for user in &users {
602 this.users.insert(user.id, user.clone());
603 }
604 });
605 }
606 Ok(users)
607 } else {
608 Ok(Vec::new())
609 }
610 })
611 }
612}
613
614impl User {
615 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
616 Arc::new(User {
617 id: message.id,
618 github_login: message.github_login,
619 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
620 })
621 }
622}
623
624impl Contact {
625 async fn from_proto(
626 contact: proto::Contact,
627 user_store: &ModelHandle<UserStore>,
628 cx: &mut AsyncAppContext,
629 ) -> Result<Self> {
630 let user = user_store
631 .update(cx, |user_store, cx| {
632 user_store.get_user(contact.user_id, cx)
633 })
634 .await?;
635 Ok(Self {
636 user,
637 online: contact.online,
638 busy: contact.busy,
639 })
640 }
641}
642
643async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
644 let mut response = http
645 .get(url, Default::default(), true)
646 .await
647 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
648
649 if !response.status().is_success() {
650 return Err(anyhow!("avatar request failed {:?}", response.status()));
651 }
652
653 let mut body = Vec::new();
654 response
655 .body_mut()
656 .read_to_end(&mut body)
657 .await
658 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
659 let format = image::guess_format(&body)?;
660 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
661 Ok(ImageData::new(image))
662}