1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use std::sync::{Arc, Weak};
9use util::TryFutureExt as _;
10
11#[derive(Default, Debug)]
12pub struct User {
13 pub id: u64,
14 pub github_login: String,
15 pub avatar: Option<Arc<ImageData>>,
16}
17
18impl PartialOrd for User {
19 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
20 Some(self.cmp(other))
21 }
22}
23
24impl Ord for User {
25 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
26 self.github_login.cmp(&other.github_login)
27 }
28}
29
30impl PartialEq for User {
31 fn eq(&self, other: &Self) -> bool {
32 self.id == other.id && self.github_login == other.github_login
33 }
34}
35
36impl Eq for User {}
37
38#[derive(Debug, PartialEq)]
39pub struct Contact {
40 pub user: Arc<User>,
41 pub online: bool,
42 pub busy: bool,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum ContactRequestStatus {
47 None,
48 RequestSent,
49 RequestReceived,
50 RequestAccepted,
51}
52
53pub struct UserStore {
54 users: HashMap<u64, Arc<User>>,
55 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
56 current_user: watch::Receiver<Option<Arc<User>>>,
57 contacts: Vec<Arc<Contact>>,
58 incoming_contact_requests: Vec<Arc<User>>,
59 outgoing_contact_requests: Vec<Arc<User>>,
60 pending_contact_requests: HashMap<u64, usize>,
61 invite_info: Option<InviteInfo>,
62 client: Weak<Client>,
63 http: Arc<dyn HttpClient>,
64 _maintain_contacts: Task<()>,
65 _maintain_current_user: Task<()>,
66}
67
68#[derive(Clone)]
69pub struct InviteInfo {
70 pub count: u32,
71 pub url: Arc<str>,
72}
73
74pub enum Event {
75 Contact {
76 user: Arc<User>,
77 kind: ContactEventKind,
78 },
79 ShowContacts,
80}
81
82#[derive(Clone, Copy)]
83pub enum ContactEventKind {
84 Requested,
85 Accepted,
86 Cancelled,
87}
88
89impl Entity for UserStore {
90 type Event = Event;
91}
92
93enum UpdateContacts {
94 Update(proto::UpdateContacts),
95 Wait(postage::barrier::Sender),
96 Clear(postage::barrier::Sender),
97}
98
99impl UserStore {
100 pub fn new(
101 client: Arc<Client>,
102 http: Arc<dyn HttpClient>,
103 cx: &mut ModelContext<Self>,
104 ) -> Self {
105 let (mut current_user_tx, current_user_rx) = watch::channel();
106 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
107 let rpc_subscriptions = vec![
108 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
109 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
110 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
111 ];
112 Self {
113 users: Default::default(),
114 current_user: current_user_rx,
115 contacts: Default::default(),
116 incoming_contact_requests: Default::default(),
117 outgoing_contact_requests: Default::default(),
118 invite_info: None,
119 client: Arc::downgrade(&client),
120 update_contacts_tx,
121 http,
122 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
123 let _subscriptions = rpc_subscriptions;
124 while let Some(message) = update_contacts_rx.next().await {
125 if let Some(this) = this.upgrade(&cx) {
126 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
127 .log_err()
128 .await;
129 }
130 }
131 }),
132 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
133 let mut status = client.status();
134 while let Some(status) = status.next().await {
135 match status {
136 Status::Connected { .. } => {
137 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
138 let fetch_user = this
139 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
140 .log_err();
141 let fetch_metrics_id =
142 client.request(proto::GetPrivateUserInfo {}).log_err();
143 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
144 if let Some(info) = info {
145 client.telemetry.set_authenticated_user_info(
146 Some(info.metrics_id.clone()),
147 info.staff,
148 );
149 } else {
150 client.telemetry.set_authenticated_user_info(None, false);
151 }
152
153 client.telemetry.report_event("sign in", Default::default());
154 current_user_tx.send(user).await.ok();
155 }
156 }
157 Status::SignedOut => {
158 current_user_tx.send(None).await.ok();
159 if let Some(this) = this.upgrade(&cx) {
160 this.update(&mut cx, |this, _| this.clear_contacts()).await;
161 }
162 }
163 Status::ConnectionLost => {
164 if let Some(this) = this.upgrade(&cx) {
165 this.update(&mut cx, |this, _| this.clear_contacts()).await;
166 }
167 }
168 _ => {}
169 }
170 }
171 }),
172 pending_contact_requests: Default::default(),
173 }
174 }
175
176 async fn handle_update_invite_info(
177 this: ModelHandle<Self>,
178 message: TypedEnvelope<proto::UpdateInviteInfo>,
179 _: Arc<Client>,
180 mut cx: AsyncAppContext,
181 ) -> Result<()> {
182 this.update(&mut cx, |this, cx| {
183 this.invite_info = Some(InviteInfo {
184 url: Arc::from(message.payload.url),
185 count: message.payload.count,
186 });
187 cx.notify();
188 });
189 Ok(())
190 }
191
192 async fn handle_show_contacts(
193 this: ModelHandle<Self>,
194 _: TypedEnvelope<proto::ShowContacts>,
195 _: Arc<Client>,
196 mut cx: AsyncAppContext,
197 ) -> Result<()> {
198 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
199 Ok(())
200 }
201
202 pub fn invite_info(&self) -> Option<&InviteInfo> {
203 self.invite_info.as_ref()
204 }
205
206 async fn handle_update_contacts(
207 this: ModelHandle<Self>,
208 message: TypedEnvelope<proto::UpdateContacts>,
209 _: Arc<Client>,
210 mut cx: AsyncAppContext,
211 ) -> Result<()> {
212 this.update(&mut cx, |this, _| {
213 this.update_contacts_tx
214 .unbounded_send(UpdateContacts::Update(message.payload))
215 .unwrap();
216 });
217 Ok(())
218 }
219
220 fn update_contacts(
221 &mut self,
222 message: UpdateContacts,
223 cx: &mut ModelContext<Self>,
224 ) -> Task<Result<()>> {
225 match message {
226 UpdateContacts::Wait(barrier) => {
227 drop(barrier);
228 Task::ready(Ok(()))
229 }
230 UpdateContacts::Clear(barrier) => {
231 self.contacts.clear();
232 self.incoming_contact_requests.clear();
233 self.outgoing_contact_requests.clear();
234 drop(barrier);
235 Task::ready(Ok(()))
236 }
237 UpdateContacts::Update(message) => {
238 let mut user_ids = HashSet::default();
239 for contact in &message.contacts {
240 user_ids.insert(contact.user_id);
241 }
242 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
243 user_ids.extend(message.outgoing_requests.iter());
244
245 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
246 cx.spawn(|this, mut cx| async move {
247 load_users.await?;
248
249 // Users are fetched in parallel above and cached in call to get_users
250 // No need to paralellize here
251 let mut updated_contacts = Vec::new();
252 for contact in message.contacts {
253 let should_notify = contact.should_notify;
254 updated_contacts.push((
255 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
256 should_notify,
257 ));
258 }
259
260 let mut incoming_requests = Vec::new();
261 for request in message.incoming_requests {
262 incoming_requests.push({
263 let user = this
264 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
265 .await?;
266 (user, request.should_notify)
267 });
268 }
269
270 let mut outgoing_requests = Vec::new();
271 for requested_user_id in message.outgoing_requests {
272 outgoing_requests.push(
273 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
274 .await?,
275 );
276 }
277
278 let removed_contacts =
279 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
280 let removed_incoming_requests =
281 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
282 let removed_outgoing_requests =
283 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
284
285 this.update(&mut cx, |this, cx| {
286 // Remove contacts
287 this.contacts
288 .retain(|contact| !removed_contacts.contains(&contact.user.id));
289 // Update existing contacts and insert new ones
290 for (updated_contact, should_notify) in updated_contacts {
291 if should_notify {
292 cx.emit(Event::Contact {
293 user: updated_contact.user.clone(),
294 kind: ContactEventKind::Accepted,
295 });
296 }
297 match this.contacts.binary_search_by_key(
298 &&updated_contact.user.github_login,
299 |contact| &contact.user.github_login,
300 ) {
301 Ok(ix) => this.contacts[ix] = updated_contact,
302 Err(ix) => this.contacts.insert(ix, updated_contact),
303 }
304 }
305
306 // Remove incoming contact requests
307 this.incoming_contact_requests.retain(|user| {
308 if removed_incoming_requests.contains(&user.id) {
309 cx.emit(Event::Contact {
310 user: user.clone(),
311 kind: ContactEventKind::Cancelled,
312 });
313 false
314 } else {
315 true
316 }
317 });
318 // Update existing incoming requests and insert new ones
319 for (user, should_notify) in incoming_requests {
320 if should_notify {
321 cx.emit(Event::Contact {
322 user: user.clone(),
323 kind: ContactEventKind::Requested,
324 });
325 }
326
327 match this
328 .incoming_contact_requests
329 .binary_search_by_key(&&user.github_login, |contact| {
330 &contact.github_login
331 }) {
332 Ok(ix) => this.incoming_contact_requests[ix] = user,
333 Err(ix) => this.incoming_contact_requests.insert(ix, user),
334 }
335 }
336
337 // Remove outgoing contact requests
338 this.outgoing_contact_requests
339 .retain(|user| !removed_outgoing_requests.contains(&user.id));
340 // Update existing incoming requests and insert new ones
341 for request in outgoing_requests {
342 match this
343 .outgoing_contact_requests
344 .binary_search_by_key(&&request.github_login, |contact| {
345 &contact.github_login
346 }) {
347 Ok(ix) => this.outgoing_contact_requests[ix] = request,
348 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
349 }
350 }
351
352 cx.notify();
353 });
354
355 Ok(())
356 })
357 }
358 }
359 }
360
361 pub fn contacts(&self) -> &[Arc<Contact>] {
362 &self.contacts
363 }
364
365 pub fn has_contact(&self, user: &Arc<User>) -> bool {
366 self.contacts
367 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
368 .is_ok()
369 }
370
371 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
372 &self.incoming_contact_requests
373 }
374
375 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
376 &self.outgoing_contact_requests
377 }
378
379 pub fn is_contact_request_pending(&self, user: &User) -> bool {
380 self.pending_contact_requests.contains_key(&user.id)
381 }
382
383 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
384 if self
385 .contacts
386 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
387 .is_ok()
388 {
389 ContactRequestStatus::RequestAccepted
390 } else if self
391 .outgoing_contact_requests
392 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
393 .is_ok()
394 {
395 ContactRequestStatus::RequestSent
396 } else if self
397 .incoming_contact_requests
398 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
399 .is_ok()
400 {
401 ContactRequestStatus::RequestReceived
402 } else {
403 ContactRequestStatus::None
404 }
405 }
406
407 pub fn request_contact(
408 &mut self,
409 responder_id: u64,
410 cx: &mut ModelContext<Self>,
411 ) -> Task<Result<()>> {
412 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
413 }
414
415 pub fn remove_contact(
416 &mut self,
417 user_id: u64,
418 cx: &mut ModelContext<Self>,
419 ) -> Task<Result<()>> {
420 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
421 }
422
423 pub fn respond_to_contact_request(
424 &mut self,
425 requester_id: u64,
426 accept: bool,
427 cx: &mut ModelContext<Self>,
428 ) -> Task<Result<()>> {
429 self.perform_contact_request(
430 requester_id,
431 proto::RespondToContactRequest {
432 requester_id,
433 response: if accept {
434 proto::ContactRequestResponse::Accept
435 } else {
436 proto::ContactRequestResponse::Decline
437 } as i32,
438 },
439 cx,
440 )
441 }
442
443 pub fn dismiss_contact_request(
444 &mut self,
445 requester_id: u64,
446 cx: &mut ModelContext<Self>,
447 ) -> Task<Result<()>> {
448 let client = self.client.upgrade();
449 cx.spawn_weak(|_, _| async move {
450 client
451 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
452 .request(proto::RespondToContactRequest {
453 requester_id,
454 response: proto::ContactRequestResponse::Dismiss as i32,
455 })
456 .await?;
457 Ok(())
458 })
459 }
460
461 fn perform_contact_request<T: RequestMessage>(
462 &mut self,
463 user_id: u64,
464 request: T,
465 cx: &mut ModelContext<Self>,
466 ) -> Task<Result<()>> {
467 let client = self.client.upgrade();
468 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
469 cx.notify();
470
471 cx.spawn(|this, mut cx| async move {
472 let response = client
473 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
474 .request(request)
475 .await;
476 this.update(&mut cx, |this, cx| {
477 if let Entry::Occupied(mut request_count) =
478 this.pending_contact_requests.entry(user_id)
479 {
480 *request_count.get_mut() -= 1;
481 if *request_count.get() == 0 {
482 request_count.remove();
483 }
484 }
485 cx.notify();
486 });
487 response?;
488 Ok(())
489 })
490 }
491
492 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
493 let (tx, mut rx) = postage::barrier::channel();
494 self.update_contacts_tx
495 .unbounded_send(UpdateContacts::Clear(tx))
496 .unwrap();
497 async move {
498 rx.next().await;
499 }
500 }
501
502 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
503 let (tx, mut rx) = postage::barrier::channel();
504 self.update_contacts_tx
505 .unbounded_send(UpdateContacts::Wait(tx))
506 .unwrap();
507 async move {
508 rx.next().await;
509 }
510 }
511
512 pub fn get_users(
513 &mut self,
514 user_ids: Vec<u64>,
515 cx: &mut ModelContext<Self>,
516 ) -> Task<Result<Vec<Arc<User>>>> {
517 let mut user_ids_to_fetch = user_ids.clone();
518 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
519
520 cx.spawn(|this, mut cx| async move {
521 if !user_ids_to_fetch.is_empty() {
522 this.update(&mut cx, |this, cx| {
523 this.load_users(
524 proto::GetUsers {
525 user_ids: user_ids_to_fetch,
526 },
527 cx,
528 )
529 })
530 .await?;
531 }
532
533 this.read_with(&cx, |this, _| {
534 user_ids
535 .iter()
536 .map(|user_id| {
537 this.users
538 .get(user_id)
539 .cloned()
540 .ok_or_else(|| anyhow!("user {} not found", user_id))
541 })
542 .collect()
543 })
544 })
545 }
546
547 pub fn fuzzy_search_users(
548 &mut self,
549 query: String,
550 cx: &mut ModelContext<Self>,
551 ) -> Task<Result<Vec<Arc<User>>>> {
552 self.load_users(proto::FuzzySearchUsers { query }, cx)
553 }
554
555 pub fn get_user(
556 &mut self,
557 user_id: u64,
558 cx: &mut ModelContext<Self>,
559 ) -> Task<Result<Arc<User>>> {
560 if let Some(user) = self.users.get(&user_id).cloned() {
561 return cx.foreground().spawn(async move { Ok(user) });
562 }
563
564 let load_users = self.get_users(vec![user_id], cx);
565 cx.spawn(|this, mut cx| async move {
566 load_users.await?;
567 this.update(&mut cx, |this, _| {
568 this.users
569 .get(&user_id)
570 .cloned()
571 .ok_or_else(|| anyhow!("server responded with no users"))
572 })
573 })
574 }
575
576 pub fn current_user(&self) -> Option<Arc<User>> {
577 self.current_user.borrow().clone()
578 }
579
580 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
581 self.current_user.clone()
582 }
583
584 fn load_users(
585 &mut self,
586 request: impl RequestMessage<Response = UsersResponse>,
587 cx: &mut ModelContext<Self>,
588 ) -> Task<Result<Vec<Arc<User>>>> {
589 let client = self.client.clone();
590 let http = self.http.clone();
591 cx.spawn_weak(|this, mut cx| async move {
592 if let Some(rpc) = client.upgrade() {
593 let response = rpc.request(request).await.context("error loading users")?;
594 let users = future::join_all(
595 response
596 .users
597 .into_iter()
598 .map(|user| User::new(user, http.as_ref())),
599 )
600 .await;
601
602 if let Some(this) = this.upgrade(&cx) {
603 this.update(&mut cx, |this, _| {
604 for user in &users {
605 this.users.insert(user.id, user.clone());
606 }
607 });
608 }
609 Ok(users)
610 } else {
611 Ok(Vec::new())
612 }
613 })
614 }
615}
616
617impl User {
618 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
619 Arc::new(User {
620 id: message.id,
621 github_login: message.github_login,
622 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
623 })
624 }
625}
626
627impl Contact {
628 async fn from_proto(
629 contact: proto::Contact,
630 user_store: &ModelHandle<UserStore>,
631 cx: &mut AsyncAppContext,
632 ) -> Result<Self> {
633 let user = user_store
634 .update(cx, |user_store, cx| {
635 user_store.get_user(contact.user_id, cx)
636 })
637 .await?;
638 Ok(Self {
639 user,
640 online: contact.online,
641 busy: contact.busy,
642 })
643 }
644}
645
646async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
647 let mut response = http
648 .get(url, Default::default(), true)
649 .await
650 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
651
652 if !response.status().is_success() {
653 return Err(anyhow!("avatar request failed {:?}", response.status()));
654 }
655
656 let mut body = Vec::new();
657 response
658 .body_mut()
659 .read_to_end(&mut body)
660 .await
661 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
662 let format = image::guess_format(&body)?;
663 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
664 Ok(ImageData::new(image))
665}