1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{prelude::Stream, sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use std::sync::{Arc, Weak};
9use util::TryFutureExt as _;
10
11#[derive(Debug)]
12pub struct User {
13 pub id: u64,
14 pub github_login: String,
15 pub avatar: Option<Arc<ImageData>>,
16}
17
18impl PartialOrd for User {
19 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
20 Some(self.cmp(other))
21 }
22}
23
24impl Ord for User {
25 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
26 self.github_login.cmp(&other.github_login)
27 }
28}
29
30impl PartialEq for User {
31 fn eq(&self, other: &Self) -> bool {
32 self.id == other.id && self.github_login == other.github_login
33 }
34}
35
36impl Eq for User {}
37
38#[derive(Debug, PartialEq)]
39pub struct Contact {
40 pub user: Arc<User>,
41 pub online: bool,
42 pub projects: Vec<ProjectMetadata>,
43}
44
45#[derive(Clone, Debug, PartialEq)]
46pub struct ProjectMetadata {
47 pub id: u64,
48 pub visible_worktree_root_names: Vec<String>,
49 pub guests: BTreeSet<Arc<User>>,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum ContactRequestStatus {
54 None,
55 RequestSent,
56 RequestReceived,
57 RequestAccepted,
58}
59
60pub struct UserStore {
61 users: HashMap<u64, Arc<User>>,
62 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
63 current_user: watch::Receiver<Option<Arc<User>>>,
64 contacts: Vec<Arc<Contact>>,
65 incoming_contact_requests: Vec<Arc<User>>,
66 outgoing_contact_requests: Vec<Arc<User>>,
67 pending_contact_requests: HashMap<u64, usize>,
68 invite_info: Option<InviteInfo>,
69 client: Weak<Client>,
70 http: Arc<dyn HttpClient>,
71 _maintain_contacts: Task<()>,
72 _maintain_current_user: Task<()>,
73}
74
75#[derive(Clone)]
76pub struct InviteInfo {
77 pub count: u32,
78 pub url: Arc<str>,
79}
80
81pub enum Event {
82 Contact {
83 user: Arc<User>,
84 kind: ContactEventKind,
85 },
86 ShowContacts,
87}
88
89#[derive(Clone, Copy)]
90pub enum ContactEventKind {
91 Requested,
92 Accepted,
93 Cancelled,
94}
95
96impl Entity for UserStore {
97 type Event = Event;
98}
99
100enum UpdateContacts {
101 Update(proto::UpdateContacts),
102 Wait(postage::barrier::Sender),
103 Clear(postage::barrier::Sender),
104}
105
106impl UserStore {
107 pub fn new(
108 client: Arc<Client>,
109 http: Arc<dyn HttpClient>,
110 cx: &mut ModelContext<Self>,
111 ) -> Self {
112 let (mut current_user_tx, current_user_rx) = watch::channel();
113 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
114 let rpc_subscriptions = vec![
115 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
116 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
117 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
118 ];
119 Self {
120 users: Default::default(),
121 current_user: current_user_rx,
122 contacts: Default::default(),
123 incoming_contact_requests: Default::default(),
124 outgoing_contact_requests: Default::default(),
125 invite_info: None,
126 client: Arc::downgrade(&client),
127 update_contacts_tx,
128 http,
129 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
130 let _subscriptions = rpc_subscriptions;
131 while let Some(message) = update_contacts_rx.next().await {
132 if let Some(this) = this.upgrade(&cx) {
133 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
134 .log_err()
135 .await;
136 }
137 }
138 }),
139 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
140 let mut status = client.status();
141 while let Some(status) = status.recv().await {
142 match status {
143 Status::Connected { .. } => {
144 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
145 let fetch_user = this
146 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
147 .log_err();
148 let fetch_metrics_id =
149 client.request(proto::GetPrivateUserInfo {}).log_err();
150 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
151 client.telemetry.set_metrics_id(info.map(|i| i.metrics_id));
152 client.telemetry.report_event("sign in", Default::default());
153 current_user_tx.send(user).await.ok();
154 }
155 }
156 Status::SignedOut => {
157 current_user_tx.send(None).await.ok();
158 if let Some(this) = this.upgrade(&cx) {
159 this.update(&mut cx, |this, _| this.clear_contacts()).await;
160 }
161 }
162 Status::ConnectionLost => {
163 if let Some(this) = this.upgrade(&cx) {
164 this.update(&mut cx, |this, _| this.clear_contacts()).await;
165 }
166 }
167 _ => {}
168 }
169 }
170 }),
171 pending_contact_requests: Default::default(),
172 }
173 }
174
175 async fn handle_update_invite_info(
176 this: ModelHandle<Self>,
177 message: TypedEnvelope<proto::UpdateInviteInfo>,
178 _: Arc<Client>,
179 mut cx: AsyncAppContext,
180 ) -> Result<()> {
181 this.update(&mut cx, |this, cx| {
182 this.invite_info = Some(InviteInfo {
183 url: Arc::from(message.payload.url),
184 count: message.payload.count,
185 });
186 cx.notify();
187 });
188 Ok(())
189 }
190
191 async fn handle_show_contacts(
192 this: ModelHandle<Self>,
193 _: TypedEnvelope<proto::ShowContacts>,
194 _: Arc<Client>,
195 mut cx: AsyncAppContext,
196 ) -> Result<()> {
197 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
198 Ok(())
199 }
200
201 pub fn invite_info(&self) -> Option<&InviteInfo> {
202 self.invite_info.as_ref()
203 }
204
205 async fn handle_update_contacts(
206 this: ModelHandle<Self>,
207 message: TypedEnvelope<proto::UpdateContacts>,
208 _: Arc<Client>,
209 mut cx: AsyncAppContext,
210 ) -> Result<()> {
211 this.update(&mut cx, |this, _| {
212 this.update_contacts_tx
213 .unbounded_send(UpdateContacts::Update(message.payload))
214 .unwrap();
215 });
216 Ok(())
217 }
218
219 fn update_contacts(
220 &mut self,
221 message: UpdateContacts,
222 cx: &mut ModelContext<Self>,
223 ) -> Task<Result<()>> {
224 match message {
225 UpdateContacts::Wait(barrier) => {
226 drop(barrier);
227 Task::ready(Ok(()))
228 }
229 UpdateContacts::Clear(barrier) => {
230 self.contacts.clear();
231 self.incoming_contact_requests.clear();
232 self.outgoing_contact_requests.clear();
233 drop(barrier);
234 Task::ready(Ok(()))
235 }
236 UpdateContacts::Update(message) => {
237 let mut user_ids = HashSet::default();
238 for contact in &message.contacts {
239 user_ids.insert(contact.user_id);
240 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
241 }
242 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
243 user_ids.extend(message.outgoing_requests.iter());
244
245 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
246 cx.spawn(|this, mut cx| async move {
247 load_users.await?;
248
249 // Users are fetched in parallel above and cached in call to get_users
250 // No need to paralellize here
251 let mut updated_contacts = Vec::new();
252 for contact in message.contacts {
253 let should_notify = contact.should_notify;
254 updated_contacts.push((
255 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
256 should_notify,
257 ));
258 }
259
260 let mut incoming_requests = Vec::new();
261 for request in message.incoming_requests {
262 incoming_requests.push({
263 let user = this
264 .update(&mut cx, |this, cx| {
265 this.fetch_user(request.requester_id, cx)
266 })
267 .await?;
268 (user, request.should_notify)
269 });
270 }
271
272 let mut outgoing_requests = Vec::new();
273 for requested_user_id in message.outgoing_requests {
274 outgoing_requests.push(
275 this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
276 .await?,
277 );
278 }
279
280 let removed_contacts =
281 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
282 let removed_incoming_requests =
283 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
284 let removed_outgoing_requests =
285 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
286
287 this.update(&mut cx, |this, cx| {
288 // Remove contacts
289 this.contacts
290 .retain(|contact| !removed_contacts.contains(&contact.user.id));
291 // Update existing contacts and insert new ones
292 for (updated_contact, should_notify) in updated_contacts {
293 if should_notify {
294 cx.emit(Event::Contact {
295 user: updated_contact.user.clone(),
296 kind: ContactEventKind::Accepted,
297 });
298 }
299 match this.contacts.binary_search_by_key(
300 &&updated_contact.user.github_login,
301 |contact| &contact.user.github_login,
302 ) {
303 Ok(ix) => this.contacts[ix] = updated_contact,
304 Err(ix) => this.contacts.insert(ix, updated_contact),
305 }
306 }
307
308 // Remove incoming contact requests
309 this.incoming_contact_requests.retain(|user| {
310 if removed_incoming_requests.contains(&user.id) {
311 cx.emit(Event::Contact {
312 user: user.clone(),
313 kind: ContactEventKind::Cancelled,
314 });
315 false
316 } else {
317 true
318 }
319 });
320 // Update existing incoming requests and insert new ones
321 for (user, should_notify) in incoming_requests {
322 if should_notify {
323 cx.emit(Event::Contact {
324 user: user.clone(),
325 kind: ContactEventKind::Requested,
326 });
327 }
328
329 match this
330 .incoming_contact_requests
331 .binary_search_by_key(&&user.github_login, |contact| {
332 &contact.github_login
333 }) {
334 Ok(ix) => this.incoming_contact_requests[ix] = user,
335 Err(ix) => this.incoming_contact_requests.insert(ix, user),
336 }
337 }
338
339 // Remove outgoing contact requests
340 this.outgoing_contact_requests
341 .retain(|user| !removed_outgoing_requests.contains(&user.id));
342 // Update existing incoming requests and insert new ones
343 for request in outgoing_requests {
344 match this
345 .outgoing_contact_requests
346 .binary_search_by_key(&&request.github_login, |contact| {
347 &contact.github_login
348 }) {
349 Ok(ix) => this.outgoing_contact_requests[ix] = request,
350 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
351 }
352 }
353
354 cx.notify();
355 });
356
357 Ok(())
358 })
359 }
360 }
361 }
362
363 pub fn contacts(&self) -> &[Arc<Contact>] {
364 &self.contacts
365 }
366
367 pub fn has_contact(&self, user: &Arc<User>) -> bool {
368 self.contacts
369 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
370 .is_ok()
371 }
372
373 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
374 &self.incoming_contact_requests
375 }
376
377 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
378 &self.outgoing_contact_requests
379 }
380
381 pub fn is_contact_request_pending(&self, user: &User) -> bool {
382 self.pending_contact_requests.contains_key(&user.id)
383 }
384
385 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
386 if self
387 .contacts
388 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
389 .is_ok()
390 {
391 ContactRequestStatus::RequestAccepted
392 } else if self
393 .outgoing_contact_requests
394 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
395 .is_ok()
396 {
397 ContactRequestStatus::RequestSent
398 } else if self
399 .incoming_contact_requests
400 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
401 .is_ok()
402 {
403 ContactRequestStatus::RequestReceived
404 } else {
405 ContactRequestStatus::None
406 }
407 }
408
409 pub fn request_contact(
410 &mut self,
411 responder_id: u64,
412 cx: &mut ModelContext<Self>,
413 ) -> Task<Result<()>> {
414 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
415 }
416
417 pub fn remove_contact(
418 &mut self,
419 user_id: u64,
420 cx: &mut ModelContext<Self>,
421 ) -> Task<Result<()>> {
422 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
423 }
424
425 pub fn respond_to_contact_request(
426 &mut self,
427 requester_id: u64,
428 accept: bool,
429 cx: &mut ModelContext<Self>,
430 ) -> Task<Result<()>> {
431 self.perform_contact_request(
432 requester_id,
433 proto::RespondToContactRequest {
434 requester_id,
435 response: if accept {
436 proto::ContactRequestResponse::Accept
437 } else {
438 proto::ContactRequestResponse::Decline
439 } as i32,
440 },
441 cx,
442 )
443 }
444
445 pub fn dismiss_contact_request(
446 &mut self,
447 requester_id: u64,
448 cx: &mut ModelContext<Self>,
449 ) -> Task<Result<()>> {
450 let client = self.client.upgrade();
451 cx.spawn_weak(|_, _| async move {
452 client
453 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
454 .request(proto::RespondToContactRequest {
455 requester_id,
456 response: proto::ContactRequestResponse::Dismiss as i32,
457 })
458 .await?;
459 Ok(())
460 })
461 }
462
463 fn perform_contact_request<T: RequestMessage>(
464 &mut self,
465 user_id: u64,
466 request: T,
467 cx: &mut ModelContext<Self>,
468 ) -> Task<Result<()>> {
469 let client = self.client.upgrade();
470 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
471 cx.notify();
472
473 cx.spawn(|this, mut cx| async move {
474 let response = client
475 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
476 .request(request)
477 .await;
478 this.update(&mut cx, |this, cx| {
479 if let Entry::Occupied(mut request_count) =
480 this.pending_contact_requests.entry(user_id)
481 {
482 *request_count.get_mut() -= 1;
483 if *request_count.get() == 0 {
484 request_count.remove();
485 }
486 }
487 cx.notify();
488 });
489 response?;
490 Ok(())
491 })
492 }
493
494 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
495 let (tx, mut rx) = postage::barrier::channel();
496 self.update_contacts_tx
497 .unbounded_send(UpdateContacts::Clear(tx))
498 .unwrap();
499 async move {
500 rx.recv().await;
501 }
502 }
503
504 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
505 let (tx, mut rx) = postage::barrier::channel();
506 self.update_contacts_tx
507 .unbounded_send(UpdateContacts::Wait(tx))
508 .unwrap();
509 async move {
510 rx.recv().await;
511 }
512 }
513
514 pub fn get_users(
515 &mut self,
516 mut user_ids: Vec<u64>,
517 cx: &mut ModelContext<Self>,
518 ) -> Task<Result<()>> {
519 user_ids.retain(|id| !self.users.contains_key(id));
520 if user_ids.is_empty() {
521 Task::ready(Ok(()))
522 } else {
523 let load = self.load_users(proto::GetUsers { user_ids }, cx);
524 cx.foreground().spawn(async move {
525 load.await?;
526 Ok(())
527 })
528 }
529 }
530
531 pub fn fuzzy_search_users(
532 &mut self,
533 query: String,
534 cx: &mut ModelContext<Self>,
535 ) -> Task<Result<Vec<Arc<User>>>> {
536 self.load_users(proto::FuzzySearchUsers { query }, cx)
537 }
538
539 pub fn fetch_user(
540 &mut self,
541 user_id: u64,
542 cx: &mut ModelContext<Self>,
543 ) -> Task<Result<Arc<User>>> {
544 if let Some(user) = self.users.get(&user_id).cloned() {
545 return cx.foreground().spawn(async move { Ok(user) });
546 }
547
548 let load_users = self.get_users(vec![user_id], cx);
549 cx.spawn(|this, mut cx| async move {
550 load_users.await?;
551 this.update(&mut cx, |this, _| {
552 this.users
553 .get(&user_id)
554 .cloned()
555 .ok_or_else(|| anyhow!("server responded with no users"))
556 })
557 })
558 }
559
560 pub fn current_user(&self) -> Option<Arc<User>> {
561 self.current_user.borrow().clone()
562 }
563
564 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
565 self.current_user.clone()
566 }
567
568 fn load_users(
569 &mut self,
570 request: impl RequestMessage<Response = UsersResponse>,
571 cx: &mut ModelContext<Self>,
572 ) -> Task<Result<Vec<Arc<User>>>> {
573 let client = self.client.clone();
574 let http = self.http.clone();
575 cx.spawn_weak(|this, mut cx| async move {
576 if let Some(rpc) = client.upgrade() {
577 let response = rpc.request(request).await.context("error loading users")?;
578 let users = future::join_all(
579 response
580 .users
581 .into_iter()
582 .map(|user| User::new(user, http.as_ref())),
583 )
584 .await;
585
586 if let Some(this) = this.upgrade(&cx) {
587 this.update(&mut cx, |this, _| {
588 for user in &users {
589 this.users.insert(user.id, user.clone());
590 }
591 });
592 }
593 Ok(users)
594 } else {
595 Ok(Vec::new())
596 }
597 })
598 }
599}
600
601impl User {
602 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
603 Arc::new(User {
604 id: message.id,
605 github_login: message.github_login,
606 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
607 })
608 }
609}
610
611impl Contact {
612 async fn from_proto(
613 contact: proto::Contact,
614 user_store: &ModelHandle<UserStore>,
615 cx: &mut AsyncAppContext,
616 ) -> Result<Self> {
617 let user = user_store
618 .update(cx, |user_store, cx| {
619 user_store.fetch_user(contact.user_id, cx)
620 })
621 .await?;
622 let mut projects = Vec::new();
623 for project in contact.projects {
624 let mut guests = BTreeSet::new();
625 for participant_id in project.guests {
626 guests.insert(
627 user_store
628 .update(cx, |user_store, cx| {
629 user_store.fetch_user(participant_id, cx)
630 })
631 .await?,
632 );
633 }
634 projects.push(ProjectMetadata {
635 id: project.id,
636 visible_worktree_root_names: project.visible_worktree_root_names.clone(),
637 guests,
638 });
639 }
640 Ok(Self {
641 user,
642 online: contact.online,
643 projects,
644 })
645 }
646
647 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
648 self.projects
649 .iter()
650 .filter(|project| !project.visible_worktree_root_names.is_empty())
651 }
652}
653
654async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
655 let mut response = http
656 .get(url, Default::default(), true)
657 .await
658 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
659
660 if !response.status().is_success() {
661 return Err(anyhow!("avatar request failed {:?}", response.status()));
662 }
663
664 let mut body = Vec::new();
665 response
666 .body_mut()
667 .read_to_end(&mut body)
668 .await
669 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
670 let format = image::guess_format(&body)?;
671 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
672 Ok(ImageData::new(image))
673}