1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{prelude::Stream, sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use std::sync::{Arc, Weak};
9use util::TryFutureExt as _;
10
11#[derive(Debug)]
12pub struct User {
13 pub id: u64,
14 pub github_login: String,
15 pub avatar: Option<Arc<ImageData>>,
16}
17
18impl PartialOrd for User {
19 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
20 Some(self.cmp(other))
21 }
22}
23
24impl Ord for User {
25 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
26 self.github_login.cmp(&other.github_login)
27 }
28}
29
30impl PartialEq for User {
31 fn eq(&self, other: &Self) -> bool {
32 self.id == other.id && self.github_login == other.github_login
33 }
34}
35
36impl Eq for User {}
37
38#[derive(Debug, PartialEq)]
39pub struct Contact {
40 pub user: Arc<User>,
41 pub online: bool,
42 pub projects: Vec<ProjectMetadata>,
43}
44
45#[derive(Clone, Debug, PartialEq)]
46pub struct ProjectMetadata {
47 pub id: u64,
48 pub visible_worktree_root_names: Vec<String>,
49 pub guests: BTreeSet<Arc<User>>,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum ContactRequestStatus {
54 None,
55 RequestSent,
56 RequestReceived,
57 RequestAccepted,
58}
59
60pub struct UserStore {
61 users: HashMap<u64, Arc<User>>,
62 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
63 current_user: watch::Receiver<Option<Arc<User>>>,
64 contacts: Vec<Arc<Contact>>,
65 incoming_contact_requests: Vec<Arc<User>>,
66 outgoing_contact_requests: Vec<Arc<User>>,
67 pending_contact_requests: HashMap<u64, usize>,
68 invite_info: Option<InviteInfo>,
69 client: Weak<Client>,
70 http: Arc<dyn HttpClient>,
71 _maintain_contacts: Task<()>,
72 _maintain_current_user: Task<()>,
73}
74
75#[derive(Clone)]
76pub struct InviteInfo {
77 pub count: u32,
78 pub url: Arc<str>,
79}
80
81pub enum Event {
82 Contact {
83 user: Arc<User>,
84 kind: ContactEventKind,
85 },
86 ShowContacts,
87}
88
89#[derive(Clone, Copy)]
90pub enum ContactEventKind {
91 Requested,
92 Accepted,
93 Cancelled,
94}
95
96impl Entity for UserStore {
97 type Event = Event;
98}
99
100enum UpdateContacts {
101 Update(proto::UpdateContacts),
102 Wait(postage::barrier::Sender),
103 Clear(postage::barrier::Sender),
104}
105
106impl UserStore {
107 pub fn new(
108 client: Arc<Client>,
109 http: Arc<dyn HttpClient>,
110 cx: &mut ModelContext<Self>,
111 ) -> Self {
112 let (mut current_user_tx, current_user_rx) = watch::channel();
113 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
114 let rpc_subscriptions = vec![
115 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
116 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
117 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
118 ];
119 Self {
120 users: Default::default(),
121 current_user: current_user_rx,
122 contacts: Default::default(),
123 incoming_contact_requests: Default::default(),
124 outgoing_contact_requests: Default::default(),
125 invite_info: None,
126 client: Arc::downgrade(&client),
127 update_contacts_tx,
128 http,
129 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
130 let _subscriptions = rpc_subscriptions;
131 while let Some(message) = update_contacts_rx.next().await {
132 if let Some(this) = this.upgrade(&cx) {
133 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
134 .log_err()
135 .await;
136 }
137 }
138 }),
139 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
140 let mut status = client.status();
141 while let Some(status) = status.recv().await {
142 match status {
143 Status::Connected { .. } => {
144 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
145 let fetch_user = this
146 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
147 .log_err();
148 let fetch_metrics_id =
149 client.request(proto::GetPrivateUserInfo {}).log_err();
150 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
151 if let Some(info) = info {
152 client.telemetry.set_authenticated_user_info(
153 Some(info.metrics_id),
154 info.staff,
155 );
156 } else {
157 client.telemetry.set_authenticated_user_info(None, false);
158 }
159 client.telemetry.report_event("sign in", Default::default());
160 current_user_tx.send(user).await.ok();
161 }
162 }
163 Status::SignedOut => {
164 current_user_tx.send(None).await.ok();
165 if let Some(this) = this.upgrade(&cx) {
166 this.update(&mut cx, |this, _| this.clear_contacts()).await;
167 }
168 }
169 Status::ConnectionLost => {
170 if let Some(this) = this.upgrade(&cx) {
171 this.update(&mut cx, |this, _| this.clear_contacts()).await;
172 }
173 }
174 _ => {}
175 }
176 }
177 }),
178 pending_contact_requests: Default::default(),
179 }
180 }
181
182 async fn handle_update_invite_info(
183 this: ModelHandle<Self>,
184 message: TypedEnvelope<proto::UpdateInviteInfo>,
185 _: Arc<Client>,
186 mut cx: AsyncAppContext,
187 ) -> Result<()> {
188 this.update(&mut cx, |this, cx| {
189 this.invite_info = Some(InviteInfo {
190 url: Arc::from(message.payload.url),
191 count: message.payload.count,
192 });
193 cx.notify();
194 });
195 Ok(())
196 }
197
198 async fn handle_show_contacts(
199 this: ModelHandle<Self>,
200 _: TypedEnvelope<proto::ShowContacts>,
201 _: Arc<Client>,
202 mut cx: AsyncAppContext,
203 ) -> Result<()> {
204 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
205 Ok(())
206 }
207
208 pub fn invite_info(&self) -> Option<&InviteInfo> {
209 self.invite_info.as_ref()
210 }
211
212 async fn handle_update_contacts(
213 this: ModelHandle<Self>,
214 message: TypedEnvelope<proto::UpdateContacts>,
215 _: Arc<Client>,
216 mut cx: AsyncAppContext,
217 ) -> Result<()> {
218 this.update(&mut cx, |this, _| {
219 this.update_contacts_tx
220 .unbounded_send(UpdateContacts::Update(message.payload))
221 .unwrap();
222 });
223 Ok(())
224 }
225
226 fn update_contacts(
227 &mut self,
228 message: UpdateContacts,
229 cx: &mut ModelContext<Self>,
230 ) -> Task<Result<()>> {
231 match message {
232 UpdateContacts::Wait(barrier) => {
233 drop(barrier);
234 Task::ready(Ok(()))
235 }
236 UpdateContacts::Clear(barrier) => {
237 self.contacts.clear();
238 self.incoming_contact_requests.clear();
239 self.outgoing_contact_requests.clear();
240 drop(barrier);
241 Task::ready(Ok(()))
242 }
243 UpdateContacts::Update(message) => {
244 let mut user_ids = HashSet::default();
245 for contact in &message.contacts {
246 user_ids.insert(contact.user_id);
247 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
248 }
249 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
250 user_ids.extend(message.outgoing_requests.iter());
251
252 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
253 cx.spawn(|this, mut cx| async move {
254 load_users.await?;
255
256 // Users are fetched in parallel above and cached in call to get_users
257 // No need to paralellize here
258 let mut updated_contacts = Vec::new();
259 for contact in message.contacts {
260 let should_notify = contact.should_notify;
261 updated_contacts.push((
262 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
263 should_notify,
264 ));
265 }
266
267 let mut incoming_requests = Vec::new();
268 for request in message.incoming_requests {
269 incoming_requests.push({
270 let user = this
271 .update(&mut cx, |this, cx| {
272 this.fetch_user(request.requester_id, cx)
273 })
274 .await?;
275 (user, request.should_notify)
276 });
277 }
278
279 let mut outgoing_requests = Vec::new();
280 for requested_user_id in message.outgoing_requests {
281 outgoing_requests.push(
282 this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
283 .await?,
284 );
285 }
286
287 let removed_contacts =
288 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
289 let removed_incoming_requests =
290 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
291 let removed_outgoing_requests =
292 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
293
294 this.update(&mut cx, |this, cx| {
295 // Remove contacts
296 this.contacts
297 .retain(|contact| !removed_contacts.contains(&contact.user.id));
298 // Update existing contacts and insert new ones
299 for (updated_contact, should_notify) in updated_contacts {
300 if should_notify {
301 cx.emit(Event::Contact {
302 user: updated_contact.user.clone(),
303 kind: ContactEventKind::Accepted,
304 });
305 }
306 match this.contacts.binary_search_by_key(
307 &&updated_contact.user.github_login,
308 |contact| &contact.user.github_login,
309 ) {
310 Ok(ix) => this.contacts[ix] = updated_contact,
311 Err(ix) => this.contacts.insert(ix, updated_contact),
312 }
313 }
314
315 // Remove incoming contact requests
316 this.incoming_contact_requests.retain(|user| {
317 if removed_incoming_requests.contains(&user.id) {
318 cx.emit(Event::Contact {
319 user: user.clone(),
320 kind: ContactEventKind::Cancelled,
321 });
322 false
323 } else {
324 true
325 }
326 });
327 // Update existing incoming requests and insert new ones
328 for (user, should_notify) in incoming_requests {
329 if should_notify {
330 cx.emit(Event::Contact {
331 user: user.clone(),
332 kind: ContactEventKind::Requested,
333 });
334 }
335
336 match this
337 .incoming_contact_requests
338 .binary_search_by_key(&&user.github_login, |contact| {
339 &contact.github_login
340 }) {
341 Ok(ix) => this.incoming_contact_requests[ix] = user,
342 Err(ix) => this.incoming_contact_requests.insert(ix, user),
343 }
344 }
345
346 // Remove outgoing contact requests
347 this.outgoing_contact_requests
348 .retain(|user| !removed_outgoing_requests.contains(&user.id));
349 // Update existing incoming requests and insert new ones
350 for request in outgoing_requests {
351 match this
352 .outgoing_contact_requests
353 .binary_search_by_key(&&request.github_login, |contact| {
354 &contact.github_login
355 }) {
356 Ok(ix) => this.outgoing_contact_requests[ix] = request,
357 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
358 }
359 }
360
361 cx.notify();
362 });
363
364 Ok(())
365 })
366 }
367 }
368 }
369
370 pub fn contacts(&self) -> &[Arc<Contact>] {
371 &self.contacts
372 }
373
374 pub fn has_contact(&self, user: &Arc<User>) -> bool {
375 self.contacts
376 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
377 .is_ok()
378 }
379
380 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
381 &self.incoming_contact_requests
382 }
383
384 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
385 &self.outgoing_contact_requests
386 }
387
388 pub fn is_contact_request_pending(&self, user: &User) -> bool {
389 self.pending_contact_requests.contains_key(&user.id)
390 }
391
392 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
393 if self
394 .contacts
395 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
396 .is_ok()
397 {
398 ContactRequestStatus::RequestAccepted
399 } else if self
400 .outgoing_contact_requests
401 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
402 .is_ok()
403 {
404 ContactRequestStatus::RequestSent
405 } else if self
406 .incoming_contact_requests
407 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
408 .is_ok()
409 {
410 ContactRequestStatus::RequestReceived
411 } else {
412 ContactRequestStatus::None
413 }
414 }
415
416 pub fn request_contact(
417 &mut self,
418 responder_id: u64,
419 cx: &mut ModelContext<Self>,
420 ) -> Task<Result<()>> {
421 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
422 }
423
424 pub fn remove_contact(
425 &mut self,
426 user_id: u64,
427 cx: &mut ModelContext<Self>,
428 ) -> Task<Result<()>> {
429 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
430 }
431
432 pub fn respond_to_contact_request(
433 &mut self,
434 requester_id: u64,
435 accept: bool,
436 cx: &mut ModelContext<Self>,
437 ) -> Task<Result<()>> {
438 self.perform_contact_request(
439 requester_id,
440 proto::RespondToContactRequest {
441 requester_id,
442 response: if accept {
443 proto::ContactRequestResponse::Accept
444 } else {
445 proto::ContactRequestResponse::Decline
446 } as i32,
447 },
448 cx,
449 )
450 }
451
452 pub fn dismiss_contact_request(
453 &mut self,
454 requester_id: u64,
455 cx: &mut ModelContext<Self>,
456 ) -> Task<Result<()>> {
457 let client = self.client.upgrade();
458 cx.spawn_weak(|_, _| async move {
459 client
460 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
461 .request(proto::RespondToContactRequest {
462 requester_id,
463 response: proto::ContactRequestResponse::Dismiss as i32,
464 })
465 .await?;
466 Ok(())
467 })
468 }
469
470 fn perform_contact_request<T: RequestMessage>(
471 &mut self,
472 user_id: u64,
473 request: T,
474 cx: &mut ModelContext<Self>,
475 ) -> Task<Result<()>> {
476 let client = self.client.upgrade();
477 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
478 cx.notify();
479
480 cx.spawn(|this, mut cx| async move {
481 let response = client
482 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
483 .request(request)
484 .await;
485 this.update(&mut cx, |this, cx| {
486 if let Entry::Occupied(mut request_count) =
487 this.pending_contact_requests.entry(user_id)
488 {
489 *request_count.get_mut() -= 1;
490 if *request_count.get() == 0 {
491 request_count.remove();
492 }
493 }
494 cx.notify();
495 });
496 response?;
497 Ok(())
498 })
499 }
500
501 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
502 let (tx, mut rx) = postage::barrier::channel();
503 self.update_contacts_tx
504 .unbounded_send(UpdateContacts::Clear(tx))
505 .unwrap();
506 async move {
507 rx.recv().await;
508 }
509 }
510
511 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
512 let (tx, mut rx) = postage::barrier::channel();
513 self.update_contacts_tx
514 .unbounded_send(UpdateContacts::Wait(tx))
515 .unwrap();
516 async move {
517 rx.recv().await;
518 }
519 }
520
521 pub fn get_users(
522 &mut self,
523 mut user_ids: Vec<u64>,
524 cx: &mut ModelContext<Self>,
525 ) -> Task<Result<()>> {
526 user_ids.retain(|id| !self.users.contains_key(id));
527 if user_ids.is_empty() {
528 Task::ready(Ok(()))
529 } else {
530 let load = self.load_users(proto::GetUsers { user_ids }, cx);
531 cx.foreground().spawn(async move {
532 load.await?;
533 Ok(())
534 })
535 }
536 }
537
538 pub fn fuzzy_search_users(
539 &mut self,
540 query: String,
541 cx: &mut ModelContext<Self>,
542 ) -> Task<Result<Vec<Arc<User>>>> {
543 self.load_users(proto::FuzzySearchUsers { query }, cx)
544 }
545
546 pub fn fetch_user(
547 &mut self,
548 user_id: u64,
549 cx: &mut ModelContext<Self>,
550 ) -> Task<Result<Arc<User>>> {
551 if let Some(user) = self.users.get(&user_id).cloned() {
552 return cx.foreground().spawn(async move { Ok(user) });
553 }
554
555 let load_users = self.get_users(vec![user_id], cx);
556 cx.spawn(|this, mut cx| async move {
557 load_users.await?;
558 this.update(&mut cx, |this, _| {
559 this.users
560 .get(&user_id)
561 .cloned()
562 .ok_or_else(|| anyhow!("server responded with no users"))
563 })
564 })
565 }
566
567 pub fn current_user(&self) -> Option<Arc<User>> {
568 self.current_user.borrow().clone()
569 }
570
571 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
572 self.current_user.clone()
573 }
574
575 fn load_users(
576 &mut self,
577 request: impl RequestMessage<Response = UsersResponse>,
578 cx: &mut ModelContext<Self>,
579 ) -> Task<Result<Vec<Arc<User>>>> {
580 let client = self.client.clone();
581 let http = self.http.clone();
582 cx.spawn_weak(|this, mut cx| async move {
583 if let Some(rpc) = client.upgrade() {
584 let response = rpc.request(request).await.context("error loading users")?;
585 let users = future::join_all(
586 response
587 .users
588 .into_iter()
589 .map(|user| User::new(user, http.as_ref())),
590 )
591 .await;
592
593 if let Some(this) = this.upgrade(&cx) {
594 this.update(&mut cx, |this, _| {
595 for user in &users {
596 this.users.insert(user.id, user.clone());
597 }
598 });
599 }
600 Ok(users)
601 } else {
602 Ok(Vec::new())
603 }
604 })
605 }
606}
607
608impl User {
609 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
610 Arc::new(User {
611 id: message.id,
612 github_login: message.github_login,
613 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
614 })
615 }
616}
617
618impl Contact {
619 async fn from_proto(
620 contact: proto::Contact,
621 user_store: &ModelHandle<UserStore>,
622 cx: &mut AsyncAppContext,
623 ) -> Result<Self> {
624 let user = user_store
625 .update(cx, |user_store, cx| {
626 user_store.fetch_user(contact.user_id, cx)
627 })
628 .await?;
629 let mut projects = Vec::new();
630 for project in contact.projects {
631 let mut guests = BTreeSet::new();
632 for participant_id in project.guests {
633 guests.insert(
634 user_store
635 .update(cx, |user_store, cx| {
636 user_store.fetch_user(participant_id, cx)
637 })
638 .await?,
639 );
640 }
641 projects.push(ProjectMetadata {
642 id: project.id,
643 visible_worktree_root_names: project.visible_worktree_root_names.clone(),
644 guests,
645 });
646 }
647 Ok(Self {
648 user,
649 online: contact.online,
650 projects,
651 })
652 }
653
654 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
655 self.projects
656 .iter()
657 .filter(|project| !project.visible_worktree_root_names.is_empty())
658 }
659}
660
661async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
662 let mut response = http
663 .get(url, Default::default(), true)
664 .await
665 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
666
667 if !response.status().is_success() {
668 return Err(anyhow!("avatar request failed {:?}", response.status()));
669 }
670
671 let mut body = Vec::new();
672 response
673 .body_mut()
674 .read_to_end(&mut body)
675 .await
676 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
677 let format = image::guess_format(&body)?;
678 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
679 Ok(ImageData::new(image))
680}