1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use std::sync::{Arc, Weak};
9use util::TryFutureExt as _;
10
11#[derive(Default, Debug)]
12pub struct User {
13 pub id: u64,
14 pub github_login: String,
15 pub avatar: Option<Arc<ImageData>>,
16}
17
18impl PartialOrd for User {
19 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
20 Some(self.cmp(other))
21 }
22}
23
24impl Ord for User {
25 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
26 self.github_login.cmp(&other.github_login)
27 }
28}
29
30impl PartialEq for User {
31 fn eq(&self, other: &Self) -> bool {
32 self.id == other.id && self.github_login == other.github_login
33 }
34}
35
36impl Eq for User {}
37
38#[derive(Debug, PartialEq)]
39pub struct Contact {
40 pub user: Arc<User>,
41 pub online: bool,
42 pub busy: bool,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum ContactRequestStatus {
47 None,
48 RequestSent,
49 RequestReceived,
50 RequestAccepted,
51}
52
53pub struct UserStore {
54 users: HashMap<u64, Arc<User>>,
55 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
56 current_user: watch::Receiver<Option<Arc<User>>>,
57 contacts: Vec<Arc<Contact>>,
58 incoming_contact_requests: Vec<Arc<User>>,
59 outgoing_contact_requests: Vec<Arc<User>>,
60 pending_contact_requests: HashMap<u64, usize>,
61 invite_info: Option<InviteInfo>,
62 client: Weak<Client>,
63 http: Arc<dyn HttpClient>,
64 _maintain_contacts: Task<()>,
65 _maintain_current_user: Task<()>,
66}
67
68#[derive(Clone)]
69pub struct InviteInfo {
70 pub count: u32,
71 pub url: Arc<str>,
72}
73
74pub enum Event {
75 Contact {
76 user: Arc<User>,
77 kind: ContactEventKind,
78 },
79 ShowContacts,
80}
81
82#[derive(Clone, Copy)]
83pub enum ContactEventKind {
84 Requested,
85 Accepted,
86 Cancelled,
87}
88
89impl Entity for UserStore {
90 type Event = Event;
91}
92
93enum UpdateContacts {
94 Update(proto::UpdateContacts),
95 Wait(postage::barrier::Sender),
96 Clear(postage::barrier::Sender),
97}
98
99impl UserStore {
100 pub fn new(
101 client: Arc<Client>,
102 http: Arc<dyn HttpClient>,
103 cx: &mut ModelContext<Self>,
104 ) -> Self {
105 let (mut current_user_tx, current_user_rx) = watch::channel();
106 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
107 let rpc_subscriptions = vec![
108 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
109 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
110 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
111 ];
112 Self {
113 users: Default::default(),
114 current_user: current_user_rx,
115 contacts: Default::default(),
116 incoming_contact_requests: Default::default(),
117 outgoing_contact_requests: Default::default(),
118 invite_info: None,
119 client: Arc::downgrade(&client),
120 update_contacts_tx,
121 http,
122 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
123 let _subscriptions = rpc_subscriptions;
124 while let Some(message) = update_contacts_rx.next().await {
125 if let Some(this) = this.upgrade(&cx) {
126 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
127 .log_err()
128 .await;
129 }
130 }
131 }),
132 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
133 let mut status = client.status();
134 while let Some(status) = status.next().await {
135 match status {
136 Status::Connected { .. } => {
137 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
138 let fetch_user = this
139 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
140 .log_err();
141 let fetch_metrics_id =
142 client.request(proto::GetPrivateUserInfo {}).log_err();
143 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
144 if let Some(info) = info {
145 client.telemetry.set_authenticated_user_info(
146 Some(info.metrics_id.clone()),
147 info.staff,
148 );
149 } else {
150 client.telemetry.set_authenticated_user_info(None, false);
151 }
152
153 current_user_tx.send(user).await.ok();
154 }
155 }
156 Status::SignedOut => {
157 current_user_tx.send(None).await.ok();
158 if let Some(this) = this.upgrade(&cx) {
159 this.update(&mut cx, |this, _| this.clear_contacts()).await;
160 }
161 }
162 Status::ConnectionLost => {
163 if let Some(this) = this.upgrade(&cx) {
164 this.update(&mut cx, |this, _| this.clear_contacts()).await;
165 }
166 }
167 _ => {}
168 }
169 }
170 }),
171 pending_contact_requests: Default::default(),
172 }
173 }
174
175 async fn handle_update_invite_info(
176 this: ModelHandle<Self>,
177 message: TypedEnvelope<proto::UpdateInviteInfo>,
178 _: Arc<Client>,
179 mut cx: AsyncAppContext,
180 ) -> Result<()> {
181 this.update(&mut cx, |this, cx| {
182 this.invite_info = Some(InviteInfo {
183 url: Arc::from(message.payload.url),
184 count: message.payload.count,
185 });
186 cx.notify();
187 });
188 Ok(())
189 }
190
191 async fn handle_show_contacts(
192 this: ModelHandle<Self>,
193 _: TypedEnvelope<proto::ShowContacts>,
194 _: Arc<Client>,
195 mut cx: AsyncAppContext,
196 ) -> Result<()> {
197 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
198 Ok(())
199 }
200
201 pub fn invite_info(&self) -> Option<&InviteInfo> {
202 self.invite_info.as_ref()
203 }
204
205 async fn handle_update_contacts(
206 this: ModelHandle<Self>,
207 message: TypedEnvelope<proto::UpdateContacts>,
208 _: Arc<Client>,
209 mut cx: AsyncAppContext,
210 ) -> Result<()> {
211 this.update(&mut cx, |this, _| {
212 this.update_contacts_tx
213 .unbounded_send(UpdateContacts::Update(message.payload))
214 .unwrap();
215 });
216 Ok(())
217 }
218
219 fn update_contacts(
220 &mut self,
221 message: UpdateContacts,
222 cx: &mut ModelContext<Self>,
223 ) -> Task<Result<()>> {
224 match message {
225 UpdateContacts::Wait(barrier) => {
226 drop(barrier);
227 Task::ready(Ok(()))
228 }
229 UpdateContacts::Clear(barrier) => {
230 self.contacts.clear();
231 self.incoming_contact_requests.clear();
232 self.outgoing_contact_requests.clear();
233 drop(barrier);
234 Task::ready(Ok(()))
235 }
236 UpdateContacts::Update(message) => {
237 let mut user_ids = HashSet::default();
238 for contact in &message.contacts {
239 user_ids.insert(contact.user_id);
240 }
241 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
242 user_ids.extend(message.outgoing_requests.iter());
243
244 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
245 cx.spawn(|this, mut cx| async move {
246 load_users.await?;
247
248 // Users are fetched in parallel above and cached in call to get_users
249 // No need to paralellize here
250 let mut updated_contacts = Vec::new();
251 for contact in message.contacts {
252 let should_notify = contact.should_notify;
253 updated_contacts.push((
254 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
255 should_notify,
256 ));
257 }
258
259 let mut incoming_requests = Vec::new();
260 for request in message.incoming_requests {
261 incoming_requests.push({
262 let user = this
263 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
264 .await?;
265 (user, request.should_notify)
266 });
267 }
268
269 let mut outgoing_requests = Vec::new();
270 for requested_user_id in message.outgoing_requests {
271 outgoing_requests.push(
272 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
273 .await?,
274 );
275 }
276
277 let removed_contacts =
278 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
279 let removed_incoming_requests =
280 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
281 let removed_outgoing_requests =
282 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
283
284 this.update(&mut cx, |this, cx| {
285 // Remove contacts
286 this.contacts
287 .retain(|contact| !removed_contacts.contains(&contact.user.id));
288 // Update existing contacts and insert new ones
289 for (updated_contact, should_notify) in updated_contacts {
290 if should_notify {
291 cx.emit(Event::Contact {
292 user: updated_contact.user.clone(),
293 kind: ContactEventKind::Accepted,
294 });
295 }
296 match this.contacts.binary_search_by_key(
297 &&updated_contact.user.github_login,
298 |contact| &contact.user.github_login,
299 ) {
300 Ok(ix) => this.contacts[ix] = updated_contact,
301 Err(ix) => this.contacts.insert(ix, updated_contact),
302 }
303 }
304
305 // Remove incoming contact requests
306 this.incoming_contact_requests.retain(|user| {
307 if removed_incoming_requests.contains(&user.id) {
308 cx.emit(Event::Contact {
309 user: user.clone(),
310 kind: ContactEventKind::Cancelled,
311 });
312 false
313 } else {
314 true
315 }
316 });
317 // Update existing incoming requests and insert new ones
318 for (user, should_notify) in incoming_requests {
319 if should_notify {
320 cx.emit(Event::Contact {
321 user: user.clone(),
322 kind: ContactEventKind::Requested,
323 });
324 }
325
326 match this
327 .incoming_contact_requests
328 .binary_search_by_key(&&user.github_login, |contact| {
329 &contact.github_login
330 }) {
331 Ok(ix) => this.incoming_contact_requests[ix] = user,
332 Err(ix) => this.incoming_contact_requests.insert(ix, user),
333 }
334 }
335
336 // Remove outgoing contact requests
337 this.outgoing_contact_requests
338 .retain(|user| !removed_outgoing_requests.contains(&user.id));
339 // Update existing incoming requests and insert new ones
340 for request in outgoing_requests {
341 match this
342 .outgoing_contact_requests
343 .binary_search_by_key(&&request.github_login, |contact| {
344 &contact.github_login
345 }) {
346 Ok(ix) => this.outgoing_contact_requests[ix] = request,
347 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
348 }
349 }
350
351 cx.notify();
352 });
353
354 Ok(())
355 })
356 }
357 }
358 }
359
360 pub fn contacts(&self) -> &[Arc<Contact>] {
361 &self.contacts
362 }
363
364 pub fn has_contact(&self, user: &Arc<User>) -> bool {
365 self.contacts
366 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
367 .is_ok()
368 }
369
370 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
371 &self.incoming_contact_requests
372 }
373
374 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
375 &self.outgoing_contact_requests
376 }
377
378 pub fn is_contact_request_pending(&self, user: &User) -> bool {
379 self.pending_contact_requests.contains_key(&user.id)
380 }
381
382 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
383 if self
384 .contacts
385 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
386 .is_ok()
387 {
388 ContactRequestStatus::RequestAccepted
389 } else if self
390 .outgoing_contact_requests
391 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
392 .is_ok()
393 {
394 ContactRequestStatus::RequestSent
395 } else if self
396 .incoming_contact_requests
397 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
398 .is_ok()
399 {
400 ContactRequestStatus::RequestReceived
401 } else {
402 ContactRequestStatus::None
403 }
404 }
405
406 pub fn request_contact(
407 &mut self,
408 responder_id: u64,
409 cx: &mut ModelContext<Self>,
410 ) -> Task<Result<()>> {
411 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
412 }
413
414 pub fn remove_contact(
415 &mut self,
416 user_id: u64,
417 cx: &mut ModelContext<Self>,
418 ) -> Task<Result<()>> {
419 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
420 }
421
422 pub fn respond_to_contact_request(
423 &mut self,
424 requester_id: u64,
425 accept: bool,
426 cx: &mut ModelContext<Self>,
427 ) -> Task<Result<()>> {
428 self.perform_contact_request(
429 requester_id,
430 proto::RespondToContactRequest {
431 requester_id,
432 response: if accept {
433 proto::ContactRequestResponse::Accept
434 } else {
435 proto::ContactRequestResponse::Decline
436 } as i32,
437 },
438 cx,
439 )
440 }
441
442 pub fn dismiss_contact_request(
443 &mut self,
444 requester_id: u64,
445 cx: &mut ModelContext<Self>,
446 ) -> Task<Result<()>> {
447 let client = self.client.upgrade();
448 cx.spawn_weak(|_, _| async move {
449 client
450 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
451 .request(proto::RespondToContactRequest {
452 requester_id,
453 response: proto::ContactRequestResponse::Dismiss as i32,
454 })
455 .await?;
456 Ok(())
457 })
458 }
459
460 fn perform_contact_request<T: RequestMessage>(
461 &mut self,
462 user_id: u64,
463 request: T,
464 cx: &mut ModelContext<Self>,
465 ) -> Task<Result<()>> {
466 let client = self.client.upgrade();
467 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
468 cx.notify();
469
470 cx.spawn(|this, mut cx| async move {
471 let response = client
472 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
473 .request(request)
474 .await;
475 this.update(&mut cx, |this, cx| {
476 if let Entry::Occupied(mut request_count) =
477 this.pending_contact_requests.entry(user_id)
478 {
479 *request_count.get_mut() -= 1;
480 if *request_count.get() == 0 {
481 request_count.remove();
482 }
483 }
484 cx.notify();
485 });
486 response?;
487 Ok(())
488 })
489 }
490
491 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
492 let (tx, mut rx) = postage::barrier::channel();
493 self.update_contacts_tx
494 .unbounded_send(UpdateContacts::Clear(tx))
495 .unwrap();
496 async move {
497 rx.next().await;
498 }
499 }
500
501 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
502 let (tx, mut rx) = postage::barrier::channel();
503 self.update_contacts_tx
504 .unbounded_send(UpdateContacts::Wait(tx))
505 .unwrap();
506 async move {
507 rx.next().await;
508 }
509 }
510
511 pub fn get_users(
512 &mut self,
513 user_ids: Vec<u64>,
514 cx: &mut ModelContext<Self>,
515 ) -> Task<Result<Vec<Arc<User>>>> {
516 let mut user_ids_to_fetch = user_ids.clone();
517 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
518
519 cx.spawn(|this, mut cx| async move {
520 if !user_ids_to_fetch.is_empty() {
521 this.update(&mut cx, |this, cx| {
522 this.load_users(
523 proto::GetUsers {
524 user_ids: user_ids_to_fetch,
525 },
526 cx,
527 )
528 })
529 .await?;
530 }
531
532 this.read_with(&cx, |this, _| {
533 user_ids
534 .iter()
535 .map(|user_id| {
536 this.users
537 .get(user_id)
538 .cloned()
539 .ok_or_else(|| anyhow!("user {} not found", user_id))
540 })
541 .collect()
542 })
543 })
544 }
545
546 pub fn fuzzy_search_users(
547 &mut self,
548 query: String,
549 cx: &mut ModelContext<Self>,
550 ) -> Task<Result<Vec<Arc<User>>>> {
551 self.load_users(proto::FuzzySearchUsers { query }, cx)
552 }
553
554 pub fn get_user(
555 &mut self,
556 user_id: u64,
557 cx: &mut ModelContext<Self>,
558 ) -> Task<Result<Arc<User>>> {
559 if let Some(user) = self.users.get(&user_id).cloned() {
560 return cx.foreground().spawn(async move { Ok(user) });
561 }
562
563 let load_users = self.get_users(vec![user_id], cx);
564 cx.spawn(|this, mut cx| async move {
565 load_users.await?;
566 this.update(&mut cx, |this, _| {
567 this.users
568 .get(&user_id)
569 .cloned()
570 .ok_or_else(|| anyhow!("server responded with no users"))
571 })
572 })
573 }
574
575 pub fn current_user(&self) -> Option<Arc<User>> {
576 self.current_user.borrow().clone()
577 }
578
579 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
580 self.current_user.clone()
581 }
582
583 fn load_users(
584 &mut self,
585 request: impl RequestMessage<Response = UsersResponse>,
586 cx: &mut ModelContext<Self>,
587 ) -> Task<Result<Vec<Arc<User>>>> {
588 let client = self.client.clone();
589 let http = self.http.clone();
590 cx.spawn_weak(|this, mut cx| async move {
591 if let Some(rpc) = client.upgrade() {
592 let response = rpc.request(request).await.context("error loading users")?;
593 let users = future::join_all(
594 response
595 .users
596 .into_iter()
597 .map(|user| User::new(user, http.as_ref())),
598 )
599 .await;
600
601 if let Some(this) = this.upgrade(&cx) {
602 this.update(&mut cx, |this, _| {
603 for user in &users {
604 this.users.insert(user.id, user.clone());
605 }
606 });
607 }
608 Ok(users)
609 } else {
610 Ok(Vec::new())
611 }
612 })
613 }
614}
615
616impl User {
617 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
618 Arc::new(User {
619 id: message.id,
620 github_login: message.github_login,
621 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
622 })
623 }
624}
625
626impl Contact {
627 async fn from_proto(
628 contact: proto::Contact,
629 user_store: &ModelHandle<UserStore>,
630 cx: &mut AsyncAppContext,
631 ) -> Result<Self> {
632 let user = user_store
633 .update(cx, |user_store, cx| {
634 user_store.get_user(contact.user_id, cx)
635 })
636 .await?;
637 Ok(Self {
638 user,
639 online: contact.online,
640 busy: contact.busy,
641 })
642 }
643}
644
645async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
646 let mut response = http
647 .get(url, Default::default(), true)
648 .await
649 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
650
651 if !response.status().is_success() {
652 return Err(anyhow!("avatar request failed {:?}", response.status()));
653 }
654
655 let mut body = Vec::new();
656 response
657 .body_mut()
658 .read_to_end(&mut body)
659 .await
660 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
661 let format = image::guess_format(&body)?;
662 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
663 Ok(ImageData::new(image))
664}