1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use feature_flags::FeatureFlagAppExt;
5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
6use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
7use postage::{sink::Sink, watch};
8use rpc::proto::{RequestMessage, UsersResponse};
9use std::sync::{Arc, Weak};
10use util::http::HttpClient;
11use util::TryFutureExt as _;
12
13pub type UserId = u64;
14
15#[derive(Default, Debug)]
16pub struct User {
17 pub id: UserId,
18 pub github_login: String,
19 pub avatar: Option<Arc<ImageData>>,
20}
21
22impl PartialOrd for User {
23 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
24 Some(self.cmp(other))
25 }
26}
27
28impl Ord for User {
29 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
30 self.github_login.cmp(&other.github_login)
31 }
32}
33
34impl PartialEq for User {
35 fn eq(&self, other: &Self) -> bool {
36 self.id == other.id && self.github_login == other.github_login
37 }
38}
39
40impl Eq for User {}
41
42#[derive(Debug, PartialEq)]
43pub struct Contact {
44 pub user: Arc<User>,
45 pub online: bool,
46 pub busy: bool,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum ContactRequestStatus {
51 None,
52 RequestSent,
53 RequestReceived,
54 RequestAccepted,
55}
56
57pub struct UserStore {
58 users: HashMap<u64, Arc<User>>,
59 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
60 current_user: watch::Receiver<Option<Arc<User>>>,
61 contacts: Vec<Arc<Contact>>,
62 incoming_contact_requests: Vec<Arc<User>>,
63 outgoing_contact_requests: Vec<Arc<User>>,
64 pending_contact_requests: HashMap<u64, usize>,
65 invite_info: Option<InviteInfo>,
66 client: Weak<Client>,
67 http: Arc<dyn HttpClient>,
68 _maintain_contacts: Task<()>,
69 _maintain_current_user: Task<()>,
70}
71
72#[derive(Clone)]
73pub struct InviteInfo {
74 pub count: u32,
75 pub url: Arc<str>,
76}
77
78pub enum Event {
79 Contact {
80 user: Arc<User>,
81 kind: ContactEventKind,
82 },
83 ShowContacts,
84}
85
86#[derive(Clone, Copy)]
87pub enum ContactEventKind {
88 Requested,
89 Accepted,
90 Cancelled,
91}
92
93impl Entity for UserStore {
94 type Event = Event;
95}
96
97enum UpdateContacts {
98 Update(proto::UpdateContacts),
99 Wait(postage::barrier::Sender),
100 Clear(postage::barrier::Sender),
101}
102
103impl UserStore {
104 pub fn new(
105 client: Arc<Client>,
106 http: Arc<dyn HttpClient>,
107 cx: &mut ModelContext<Self>,
108 ) -> Self {
109 let (mut current_user_tx, current_user_rx) = watch::channel();
110 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
111 let rpc_subscriptions = vec![
112 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
113 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
114 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
115 ];
116 Self {
117 users: Default::default(),
118 current_user: current_user_rx,
119 contacts: Default::default(),
120 incoming_contact_requests: Default::default(),
121 outgoing_contact_requests: Default::default(),
122 invite_info: None,
123 client: Arc::downgrade(&client),
124 update_contacts_tx,
125 http,
126 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
127 let _subscriptions = rpc_subscriptions;
128 while let Some(message) = update_contacts_rx.next().await {
129 if let Some(this) = this.upgrade(&cx) {
130 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
131 .log_err()
132 .await;
133 }
134 }
135 }),
136 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
137 let mut status = client.status();
138 while let Some(status) = status.next().await {
139 match status {
140 Status::Connected { .. } => {
141 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
142 let fetch_user = this
143 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
144 .log_err();
145 let fetch_metrics_id =
146 client.request(proto::GetPrivateUserInfo {}).log_err();
147 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
148
149 if let Some(info) = info {
150 cx.update(|cx| {
151 cx.update_flags(info.staff, info.flags);
152 client.telemetry.set_authenticated_user_info(
153 Some(info.metrics_id.clone()),
154 info.staff,
155 cx,
156 )
157 });
158 } else {
159 cx.read(|cx| {
160 client
161 .telemetry
162 .set_authenticated_user_info(None, false, cx)
163 });
164 }
165
166 current_user_tx.send(user).await.ok();
167
168 this.update(&mut cx, |_, cx| {
169 cx.notify();
170 });
171 }
172 }
173 Status::SignedOut => {
174 current_user_tx.send(None).await.ok();
175 if let Some(this) = this.upgrade(&cx) {
176 this.update(&mut cx, |this, cx| {
177 cx.notify();
178 this.clear_contacts()
179 })
180 .await;
181 }
182 }
183 Status::ConnectionLost => {
184 if let Some(this) = this.upgrade(&cx) {
185 this.update(&mut cx, |this, cx| {
186 cx.notify();
187 this.clear_contacts()
188 })
189 .await;
190 }
191 }
192 _ => {}
193 }
194 }
195 }),
196 pending_contact_requests: Default::default(),
197 }
198 }
199
200 #[cfg(feature = "test-support")]
201 pub fn clear_cache(&mut self) {
202 self.users.clear();
203 }
204
205 async fn handle_update_invite_info(
206 this: ModelHandle<Self>,
207 message: TypedEnvelope<proto::UpdateInviteInfo>,
208 _: Arc<Client>,
209 mut cx: AsyncAppContext,
210 ) -> Result<()> {
211 this.update(&mut cx, |this, cx| {
212 this.invite_info = Some(InviteInfo {
213 url: Arc::from(message.payload.url),
214 count: message.payload.count,
215 });
216 cx.notify();
217 });
218 Ok(())
219 }
220
221 async fn handle_show_contacts(
222 this: ModelHandle<Self>,
223 _: TypedEnvelope<proto::ShowContacts>,
224 _: Arc<Client>,
225 mut cx: AsyncAppContext,
226 ) -> Result<()> {
227 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
228 Ok(())
229 }
230
231 pub fn invite_info(&self) -> Option<&InviteInfo> {
232 self.invite_info.as_ref()
233 }
234
235 async fn handle_update_contacts(
236 this: ModelHandle<Self>,
237 message: TypedEnvelope<proto::UpdateContacts>,
238 _: Arc<Client>,
239 mut cx: AsyncAppContext,
240 ) -> Result<()> {
241 this.update(&mut cx, |this, _| {
242 this.update_contacts_tx
243 .unbounded_send(UpdateContacts::Update(message.payload))
244 .unwrap();
245 });
246 Ok(())
247 }
248
249 fn update_contacts(
250 &mut self,
251 message: UpdateContacts,
252 cx: &mut ModelContext<Self>,
253 ) -> Task<Result<()>> {
254 match message {
255 UpdateContacts::Wait(barrier) => {
256 drop(barrier);
257 Task::ready(Ok(()))
258 }
259 UpdateContacts::Clear(barrier) => {
260 self.contacts.clear();
261 self.incoming_contact_requests.clear();
262 self.outgoing_contact_requests.clear();
263 drop(barrier);
264 Task::ready(Ok(()))
265 }
266 UpdateContacts::Update(message) => {
267 let mut user_ids = HashSet::default();
268 for contact in &message.contacts {
269 user_ids.insert(contact.user_id);
270 }
271 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
272 user_ids.extend(message.outgoing_requests.iter());
273
274 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
275 cx.spawn(|this, mut cx| async move {
276 load_users.await?;
277
278 // Users are fetched in parallel above and cached in call to get_users
279 // No need to paralellize here
280 let mut updated_contacts = Vec::new();
281 for contact in message.contacts {
282 let should_notify = contact.should_notify;
283 updated_contacts.push((
284 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
285 should_notify,
286 ));
287 }
288
289 let mut incoming_requests = Vec::new();
290 for request in message.incoming_requests {
291 incoming_requests.push({
292 let user = this
293 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
294 .await?;
295 (user, request.should_notify)
296 });
297 }
298
299 let mut outgoing_requests = Vec::new();
300 for requested_user_id in message.outgoing_requests {
301 outgoing_requests.push(
302 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
303 .await?,
304 );
305 }
306
307 let removed_contacts =
308 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
309 let removed_incoming_requests =
310 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
311 let removed_outgoing_requests =
312 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
313
314 this.update(&mut cx, |this, cx| {
315 // Remove contacts
316 this.contacts
317 .retain(|contact| !removed_contacts.contains(&contact.user.id));
318 // Update existing contacts and insert new ones
319 for (updated_contact, should_notify) in updated_contacts {
320 if should_notify {
321 cx.emit(Event::Contact {
322 user: updated_contact.user.clone(),
323 kind: ContactEventKind::Accepted,
324 });
325 }
326 match this.contacts.binary_search_by_key(
327 &&updated_contact.user.github_login,
328 |contact| &contact.user.github_login,
329 ) {
330 Ok(ix) => this.contacts[ix] = updated_contact,
331 Err(ix) => this.contacts.insert(ix, updated_contact),
332 }
333 }
334
335 // Remove incoming contact requests
336 this.incoming_contact_requests.retain(|user| {
337 if removed_incoming_requests.contains(&user.id) {
338 cx.emit(Event::Contact {
339 user: user.clone(),
340 kind: ContactEventKind::Cancelled,
341 });
342 false
343 } else {
344 true
345 }
346 });
347 // Update existing incoming requests and insert new ones
348 for (user, should_notify) in incoming_requests {
349 if should_notify {
350 cx.emit(Event::Contact {
351 user: user.clone(),
352 kind: ContactEventKind::Requested,
353 });
354 }
355
356 match this
357 .incoming_contact_requests
358 .binary_search_by_key(&&user.github_login, |contact| {
359 &contact.github_login
360 }) {
361 Ok(ix) => this.incoming_contact_requests[ix] = user,
362 Err(ix) => this.incoming_contact_requests.insert(ix, user),
363 }
364 }
365
366 // Remove outgoing contact requests
367 this.outgoing_contact_requests
368 .retain(|user| !removed_outgoing_requests.contains(&user.id));
369 // Update existing incoming requests and insert new ones
370 for request in outgoing_requests {
371 match this
372 .outgoing_contact_requests
373 .binary_search_by_key(&&request.github_login, |contact| {
374 &contact.github_login
375 }) {
376 Ok(ix) => this.outgoing_contact_requests[ix] = request,
377 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
378 }
379 }
380
381 cx.notify();
382 });
383
384 Ok(())
385 })
386 }
387 }
388 }
389
390 pub fn contacts(&self) -> &[Arc<Contact>] {
391 &self.contacts
392 }
393
394 pub fn has_contact(&self, user: &Arc<User>) -> bool {
395 self.contacts
396 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
397 .is_ok()
398 }
399
400 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
401 &self.incoming_contact_requests
402 }
403
404 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
405 &self.outgoing_contact_requests
406 }
407
408 pub fn is_contact_request_pending(&self, user: &User) -> bool {
409 self.pending_contact_requests.contains_key(&user.id)
410 }
411
412 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
413 if self
414 .contacts
415 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
416 .is_ok()
417 {
418 ContactRequestStatus::RequestAccepted
419 } else if self
420 .outgoing_contact_requests
421 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
422 .is_ok()
423 {
424 ContactRequestStatus::RequestSent
425 } else if self
426 .incoming_contact_requests
427 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
428 .is_ok()
429 {
430 ContactRequestStatus::RequestReceived
431 } else {
432 ContactRequestStatus::None
433 }
434 }
435
436 pub fn request_contact(
437 &mut self,
438 responder_id: u64,
439 cx: &mut ModelContext<Self>,
440 ) -> Task<Result<()>> {
441 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
442 }
443
444 pub fn remove_contact(
445 &mut self,
446 user_id: u64,
447 cx: &mut ModelContext<Self>,
448 ) -> Task<Result<()>> {
449 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
450 }
451
452 pub fn respond_to_contact_request(
453 &mut self,
454 requester_id: u64,
455 accept: bool,
456 cx: &mut ModelContext<Self>,
457 ) -> Task<Result<()>> {
458 self.perform_contact_request(
459 requester_id,
460 proto::RespondToContactRequest {
461 requester_id,
462 response: if accept {
463 proto::ContactRequestResponse::Accept
464 } else {
465 proto::ContactRequestResponse::Decline
466 } as i32,
467 },
468 cx,
469 )
470 }
471
472 pub fn dismiss_contact_request(
473 &mut self,
474 requester_id: u64,
475 cx: &mut ModelContext<Self>,
476 ) -> Task<Result<()>> {
477 let client = self.client.upgrade();
478 cx.spawn_weak(|_, _| async move {
479 client
480 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
481 .request(proto::RespondToContactRequest {
482 requester_id,
483 response: proto::ContactRequestResponse::Dismiss as i32,
484 })
485 .await?;
486 Ok(())
487 })
488 }
489
490 fn perform_contact_request<T: RequestMessage>(
491 &mut self,
492 user_id: u64,
493 request: T,
494 cx: &mut ModelContext<Self>,
495 ) -> Task<Result<()>> {
496 let client = self.client.upgrade();
497 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
498 cx.notify();
499
500 cx.spawn(|this, mut cx| async move {
501 let response = client
502 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
503 .request(request)
504 .await;
505 this.update(&mut cx, |this, cx| {
506 if let Entry::Occupied(mut request_count) =
507 this.pending_contact_requests.entry(user_id)
508 {
509 *request_count.get_mut() -= 1;
510 if *request_count.get() == 0 {
511 request_count.remove();
512 }
513 }
514 cx.notify();
515 });
516 response?;
517 Ok(())
518 })
519 }
520
521 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
522 let (tx, mut rx) = postage::barrier::channel();
523 self.update_contacts_tx
524 .unbounded_send(UpdateContacts::Clear(tx))
525 .unwrap();
526 async move {
527 rx.next().await;
528 }
529 }
530
531 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
532 let (tx, mut rx) = postage::barrier::channel();
533 self.update_contacts_tx
534 .unbounded_send(UpdateContacts::Wait(tx))
535 .unwrap();
536 async move {
537 rx.next().await;
538 }
539 }
540
541 pub fn get_users(
542 &mut self,
543 user_ids: Vec<u64>,
544 cx: &mut ModelContext<Self>,
545 ) -> Task<Result<Vec<Arc<User>>>> {
546 let mut user_ids_to_fetch = user_ids.clone();
547 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
548
549 cx.spawn(|this, mut cx| async move {
550 if !user_ids_to_fetch.is_empty() {
551 this.update(&mut cx, |this, cx| {
552 this.load_users(
553 proto::GetUsers {
554 user_ids: user_ids_to_fetch,
555 },
556 cx,
557 )
558 })
559 .await?;
560 }
561
562 this.read_with(&cx, |this, _| {
563 user_ids
564 .iter()
565 .map(|user_id| {
566 this.users
567 .get(user_id)
568 .cloned()
569 .ok_or_else(|| anyhow!("user {} not found", user_id))
570 })
571 .collect()
572 })
573 })
574 }
575
576 pub fn fuzzy_search_users(
577 &mut self,
578 query: String,
579 cx: &mut ModelContext<Self>,
580 ) -> Task<Result<Vec<Arc<User>>>> {
581 self.load_users(proto::FuzzySearchUsers { query }, cx)
582 }
583
584 pub fn get_user(
585 &mut self,
586 user_id: u64,
587 cx: &mut ModelContext<Self>,
588 ) -> Task<Result<Arc<User>>> {
589 if let Some(user) = self.users.get(&user_id).cloned() {
590 return cx.foreground().spawn(async move { Ok(user) });
591 }
592
593 let load_users = self.get_users(vec![user_id], cx);
594 cx.spawn(|this, mut cx| async move {
595 load_users.await?;
596 this.update(&mut cx, |this, _| {
597 this.users
598 .get(&user_id)
599 .cloned()
600 .ok_or_else(|| anyhow!("server responded with no users"))
601 })
602 })
603 }
604
605 pub fn current_user(&self) -> Option<Arc<User>> {
606 self.current_user.borrow().clone()
607 }
608
609 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
610 self.current_user.clone()
611 }
612
613 fn load_users(
614 &mut self,
615 request: impl RequestMessage<Response = UsersResponse>,
616 cx: &mut ModelContext<Self>,
617 ) -> Task<Result<Vec<Arc<User>>>> {
618 let client = self.client.clone();
619 let http = self.http.clone();
620 cx.spawn_weak(|this, mut cx| async move {
621 if let Some(rpc) = client.upgrade() {
622 let response = rpc.request(request).await.context("error loading users")?;
623 let users = future::join_all(
624 response
625 .users
626 .into_iter()
627 .map(|user| User::new(user, http.as_ref())),
628 )
629 .await;
630
631 if let Some(this) = this.upgrade(&cx) {
632 this.update(&mut cx, |this, _| {
633 for user in &users {
634 this.users.insert(user.id, user.clone());
635 }
636 });
637 }
638 Ok(users)
639 } else {
640 Ok(Vec::new())
641 }
642 })
643 }
644}
645
646impl User {
647 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
648 Arc::new(User {
649 id: message.id,
650 github_login: message.github_login,
651 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
652 })
653 }
654}
655
656impl Contact {
657 async fn from_proto(
658 contact: proto::Contact,
659 user_store: &ModelHandle<UserStore>,
660 cx: &mut AsyncAppContext,
661 ) -> Result<Self> {
662 let user = user_store
663 .update(cx, |user_store, cx| {
664 user_store.get_user(contact.user_id, cx)
665 })
666 .await?;
667 Ok(Self {
668 user,
669 online: contact.online,
670 busy: contact.busy,
671 })
672 }
673}
674
675async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
676 let mut response = http
677 .get(url, Default::default(), true)
678 .await
679 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
680
681 if !response.status().is_success() {
682 return Err(anyhow!("avatar request failed {:?}", response.status()));
683 }
684
685 let mut body = Vec::new();
686 response
687 .body_mut()
688 .read_to_end(&mut body)
689 .await
690 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
691 let format = image::guess_format(&body)?;
692 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
693 Ok(ImageData::new(image))
694}