1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{prelude::Stream, sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use std::sync::{Arc, Weak};
9use util::TryFutureExt as _;
10
11#[derive(Debug)]
12pub struct User {
13 pub id: u64,
14 pub github_login: String,
15 pub avatar: Option<Arc<ImageData>>,
16}
17
18impl PartialOrd for User {
19 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
20 Some(self.cmp(&other))
21 }
22}
23
24impl Ord for User {
25 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
26 self.github_login.cmp(&other.github_login)
27 }
28}
29
30impl PartialEq for User {
31 fn eq(&self, other: &Self) -> bool {
32 self.id == other.id && self.github_login == other.github_login
33 }
34}
35
36impl Eq for User {}
37
38#[derive(Debug, PartialEq)]
39pub struct Contact {
40 pub user: Arc<User>,
41 pub online: bool,
42 pub projects: Vec<ProjectMetadata>,
43}
44
45#[derive(Clone, Debug, PartialEq)]
46pub struct ProjectMetadata {
47 pub id: u64,
48 pub worktree_root_names: Vec<String>,
49 pub guests: BTreeSet<Arc<User>>,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum ContactRequestStatus {
54 None,
55 RequestSent,
56 RequestReceived,
57 RequestAccepted,
58}
59
60pub struct UserStore {
61 users: HashMap<u64, Arc<User>>,
62 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
63 current_user: watch::Receiver<Option<Arc<User>>>,
64 contacts: Vec<Arc<Contact>>,
65 incoming_contact_requests: Vec<Arc<User>>,
66 outgoing_contact_requests: Vec<Arc<User>>,
67 pending_contact_requests: HashMap<u64, usize>,
68 invite_info: Option<InviteInfo>,
69 client: Weak<Client>,
70 http: Arc<dyn HttpClient>,
71 _maintain_contacts: Task<()>,
72 _maintain_current_user: Task<()>,
73}
74
75#[derive(Clone)]
76pub struct InviteInfo {
77 pub count: u32,
78 pub url: Arc<str>,
79}
80
81pub enum Event {
82 Contact {
83 user: Arc<User>,
84 kind: ContactEventKind,
85 },
86 ShowContacts,
87}
88
89#[derive(Clone, Copy)]
90pub enum ContactEventKind {
91 Requested,
92 Accepted,
93 Cancelled,
94}
95
96impl Entity for UserStore {
97 type Event = Event;
98}
99
100enum UpdateContacts {
101 Update(proto::UpdateContacts),
102 Wait(postage::barrier::Sender),
103 Clear(postage::barrier::Sender),
104}
105
106impl UserStore {
107 pub fn new(
108 client: Arc<Client>,
109 http: Arc<dyn HttpClient>,
110 cx: &mut ModelContext<Self>,
111 ) -> Self {
112 let (mut current_user_tx, current_user_rx) = watch::channel();
113 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
114 let rpc_subscriptions = vec![
115 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
116 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
117 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
118 ];
119 Self {
120 users: Default::default(),
121 current_user: current_user_rx,
122 contacts: Default::default(),
123 incoming_contact_requests: Default::default(),
124 outgoing_contact_requests: Default::default(),
125 invite_info: None,
126 client: Arc::downgrade(&client),
127 update_contacts_tx,
128 http,
129 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
130 let _subscriptions = rpc_subscriptions;
131 while let Some(message) = update_contacts_rx.next().await {
132 if let Some(this) = this.upgrade(&cx) {
133 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
134 .log_err()
135 .await;
136 }
137 }
138 }),
139 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
140 let mut status = client.status();
141 while let Some(status) = status.recv().await {
142 match status {
143 Status::Connected { .. } => {
144 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
145 let user = this
146 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
147 .log_err()
148 .await;
149 current_user_tx.send(user).await.ok();
150 }
151 }
152 Status::SignedOut => {
153 current_user_tx.send(None).await.ok();
154 if let Some(this) = this.upgrade(&cx) {
155 this.update(&mut cx, |this, _| this.clear_contacts()).await;
156 }
157 }
158 Status::ConnectionLost => {
159 if let Some(this) = this.upgrade(&cx) {
160 this.update(&mut cx, |this, _| this.clear_contacts()).await;
161 }
162 }
163 _ => {}
164 }
165 }
166 }),
167 pending_contact_requests: Default::default(),
168 }
169 }
170
171 async fn handle_update_invite_info(
172 this: ModelHandle<Self>,
173 message: TypedEnvelope<proto::UpdateInviteInfo>,
174 _: Arc<Client>,
175 mut cx: AsyncAppContext,
176 ) -> Result<()> {
177 this.update(&mut cx, |this, cx| {
178 this.invite_info = Some(InviteInfo {
179 url: Arc::from(message.payload.url),
180 count: message.payload.count,
181 });
182 cx.notify();
183 });
184 Ok(())
185 }
186
187 async fn handle_show_contacts(
188 this: ModelHandle<Self>,
189 _: TypedEnvelope<proto::ShowContacts>,
190 _: Arc<Client>,
191 mut cx: AsyncAppContext,
192 ) -> Result<()> {
193 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
194 Ok(())
195 }
196
197 pub fn invite_info(&self) -> Option<&InviteInfo> {
198 self.invite_info.as_ref()
199 }
200
201 async fn handle_update_contacts(
202 this: ModelHandle<Self>,
203 message: TypedEnvelope<proto::UpdateContacts>,
204 _: Arc<Client>,
205 mut cx: AsyncAppContext,
206 ) -> Result<()> {
207 this.update(&mut cx, |this, _| {
208 this.update_contacts_tx
209 .unbounded_send(UpdateContacts::Update(message.payload))
210 .unwrap();
211 });
212 Ok(())
213 }
214
215 fn update_contacts(
216 &mut self,
217 message: UpdateContacts,
218 cx: &mut ModelContext<Self>,
219 ) -> Task<Result<()>> {
220 match message {
221 UpdateContacts::Wait(barrier) => {
222 drop(barrier);
223 Task::ready(Ok(()))
224 }
225 UpdateContacts::Clear(barrier) => {
226 self.contacts.clear();
227 self.incoming_contact_requests.clear();
228 self.outgoing_contact_requests.clear();
229 drop(barrier);
230 Task::ready(Ok(()))
231 }
232 UpdateContacts::Update(message) => {
233 log::info!(
234 "update contacts on client {}: {:?}",
235 self.client.upgrade().unwrap().id,
236 message
237 );
238 let mut user_ids = HashSet::default();
239 for contact in &message.contacts {
240 user_ids.insert(contact.user_id);
241 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
242 }
243 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
244 user_ids.extend(message.outgoing_requests.iter());
245
246 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
247 cx.spawn(|this, mut cx| async move {
248 load_users.await?;
249
250 // Users are fetched in parallel above and cached in call to get_users
251 // No need to paralellize here
252 let mut updated_contacts = Vec::new();
253 for contact in message.contacts {
254 let should_notify = contact.should_notify;
255 updated_contacts.push((
256 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
257 should_notify,
258 ));
259 }
260
261 let mut incoming_requests = Vec::new();
262 for request in message.incoming_requests {
263 incoming_requests.push({
264 let user = this
265 .update(&mut cx, |this, cx| {
266 this.fetch_user(request.requester_id, cx)
267 })
268 .await?;
269 (user, request.should_notify)
270 });
271 }
272
273 let mut outgoing_requests = Vec::new();
274 for requested_user_id in message.outgoing_requests {
275 outgoing_requests.push(
276 this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
277 .await?,
278 );
279 }
280
281 let removed_contacts =
282 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
283 let removed_incoming_requests =
284 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
285 let removed_outgoing_requests =
286 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
287
288 this.update(&mut cx, |this, cx| {
289 // Remove contacts
290 this.contacts
291 .retain(|contact| !removed_contacts.contains(&contact.user.id));
292 // Update existing contacts and insert new ones
293 for (updated_contact, should_notify) in updated_contacts {
294 if should_notify {
295 cx.emit(Event::Contact {
296 user: updated_contact.user.clone(),
297 kind: ContactEventKind::Accepted,
298 });
299 }
300 match this.contacts.binary_search_by_key(
301 &&updated_contact.user.github_login,
302 |contact| &contact.user.github_login,
303 ) {
304 Ok(ix) => this.contacts[ix] = updated_contact,
305 Err(ix) => this.contacts.insert(ix, updated_contact),
306 }
307 }
308
309 // Remove incoming contact requests
310 this.incoming_contact_requests.retain(|user| {
311 if removed_incoming_requests.contains(&user.id) {
312 cx.emit(Event::Contact {
313 user: user.clone(),
314 kind: ContactEventKind::Cancelled,
315 });
316 false
317 } else {
318 true
319 }
320 });
321 // Update existing incoming requests and insert new ones
322 for (user, should_notify) in incoming_requests {
323 if should_notify {
324 cx.emit(Event::Contact {
325 user: user.clone(),
326 kind: ContactEventKind::Requested,
327 });
328 }
329
330 match this
331 .incoming_contact_requests
332 .binary_search_by_key(&&user.github_login, |contact| {
333 &contact.github_login
334 }) {
335 Ok(ix) => this.incoming_contact_requests[ix] = user,
336 Err(ix) => this.incoming_contact_requests.insert(ix, user),
337 }
338 }
339
340 // Remove outgoing contact requests
341 this.outgoing_contact_requests
342 .retain(|user| !removed_outgoing_requests.contains(&user.id));
343 // Update existing incoming requests and insert new ones
344 for request in outgoing_requests {
345 match this
346 .outgoing_contact_requests
347 .binary_search_by_key(&&request.github_login, |contact| {
348 &contact.github_login
349 }) {
350 Ok(ix) => this.outgoing_contact_requests[ix] = request,
351 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
352 }
353 }
354
355 cx.notify();
356 });
357
358 Ok(())
359 })
360 }
361 }
362 }
363
364 pub fn contacts(&self) -> &[Arc<Contact>] {
365 &self.contacts
366 }
367
368 pub fn has_contact(&self, user: &Arc<User>) -> bool {
369 self.contacts
370 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
371 .is_ok()
372 }
373
374 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
375 &self.incoming_contact_requests
376 }
377
378 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
379 &self.outgoing_contact_requests
380 }
381
382 pub fn is_contact_request_pending(&self, user: &User) -> bool {
383 self.pending_contact_requests.contains_key(&user.id)
384 }
385
386 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
387 if self
388 .contacts
389 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
390 .is_ok()
391 {
392 ContactRequestStatus::RequestAccepted
393 } else if self
394 .outgoing_contact_requests
395 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
396 .is_ok()
397 {
398 ContactRequestStatus::RequestSent
399 } else if self
400 .incoming_contact_requests
401 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
402 .is_ok()
403 {
404 ContactRequestStatus::RequestReceived
405 } else {
406 ContactRequestStatus::None
407 }
408 }
409
410 pub fn request_contact(
411 &mut self,
412 responder_id: u64,
413 cx: &mut ModelContext<Self>,
414 ) -> Task<Result<()>> {
415 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
416 }
417
418 pub fn remove_contact(
419 &mut self,
420 user_id: u64,
421 cx: &mut ModelContext<Self>,
422 ) -> Task<Result<()>> {
423 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
424 }
425
426 pub fn respond_to_contact_request(
427 &mut self,
428 requester_id: u64,
429 accept: bool,
430 cx: &mut ModelContext<Self>,
431 ) -> Task<Result<()>> {
432 self.perform_contact_request(
433 requester_id,
434 proto::RespondToContactRequest {
435 requester_id,
436 response: if accept {
437 proto::ContactRequestResponse::Accept
438 } else {
439 proto::ContactRequestResponse::Decline
440 } as i32,
441 },
442 cx,
443 )
444 }
445
446 pub fn dismiss_contact_request(
447 &mut self,
448 requester_id: u64,
449 cx: &mut ModelContext<Self>,
450 ) -> Task<Result<()>> {
451 let client = self.client.upgrade();
452 cx.spawn_weak(|_, _| async move {
453 client
454 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
455 .request(proto::RespondToContactRequest {
456 requester_id,
457 response: proto::ContactRequestResponse::Dismiss as i32,
458 })
459 .await?;
460 Ok(())
461 })
462 }
463
464 fn perform_contact_request<T: RequestMessage>(
465 &mut self,
466 user_id: u64,
467 request: T,
468 cx: &mut ModelContext<Self>,
469 ) -> Task<Result<()>> {
470 let client = self.client.upgrade();
471 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
472 cx.notify();
473
474 cx.spawn(|this, mut cx| async move {
475 let response = client
476 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
477 .request(request)
478 .await;
479 this.update(&mut cx, |this, cx| {
480 if let Entry::Occupied(mut request_count) =
481 this.pending_contact_requests.entry(user_id)
482 {
483 *request_count.get_mut() -= 1;
484 if *request_count.get() == 0 {
485 request_count.remove();
486 }
487 }
488 cx.notify();
489 });
490 response?;
491 Ok(())
492 })
493 }
494
495 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
496 let (tx, mut rx) = postage::barrier::channel();
497 self.update_contacts_tx
498 .unbounded_send(UpdateContacts::Clear(tx))
499 .unwrap();
500 async move {
501 rx.recv().await;
502 }
503 }
504
505 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
506 let (tx, mut rx) = postage::barrier::channel();
507 self.update_contacts_tx
508 .unbounded_send(UpdateContacts::Wait(tx))
509 .unwrap();
510 async move {
511 rx.recv().await;
512 }
513 }
514
515 pub fn get_users(
516 &mut self,
517 mut user_ids: Vec<u64>,
518 cx: &mut ModelContext<Self>,
519 ) -> Task<Result<()>> {
520 user_ids.retain(|id| !self.users.contains_key(id));
521 if user_ids.is_empty() {
522 Task::ready(Ok(()))
523 } else {
524 let load = self.load_users(proto::GetUsers { user_ids }, cx);
525 cx.foreground().spawn(async move {
526 load.await?;
527 Ok(())
528 })
529 }
530 }
531
532 pub fn fuzzy_search_users(
533 &mut self,
534 query: String,
535 cx: &mut ModelContext<Self>,
536 ) -> Task<Result<Vec<Arc<User>>>> {
537 self.load_users(proto::FuzzySearchUsers { query }, cx)
538 }
539
540 pub fn fetch_user(
541 &mut self,
542 user_id: u64,
543 cx: &mut ModelContext<Self>,
544 ) -> Task<Result<Arc<User>>> {
545 if let Some(user) = self.users.get(&user_id).cloned() {
546 return cx.foreground().spawn(async move { Ok(user) });
547 }
548
549 let load_users = self.get_users(vec![user_id], cx);
550 cx.spawn(|this, mut cx| async move {
551 load_users.await?;
552 this.update(&mut cx, |this, _| {
553 this.users
554 .get(&user_id)
555 .cloned()
556 .ok_or_else(|| anyhow!("server responded with no users"))
557 })
558 })
559 }
560
561 pub fn current_user(&self) -> Option<Arc<User>> {
562 self.current_user.borrow().clone()
563 }
564
565 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
566 self.current_user.clone()
567 }
568
569 fn load_users(
570 &mut self,
571 request: impl RequestMessage<Response = UsersResponse>,
572 cx: &mut ModelContext<Self>,
573 ) -> Task<Result<Vec<Arc<User>>>> {
574 let client = self.client.clone();
575 let http = self.http.clone();
576 cx.spawn_weak(|this, mut cx| async move {
577 if let Some(rpc) = client.upgrade() {
578 let response = rpc.request(request).await.context("error loading users")?;
579 let users = future::join_all(
580 response
581 .users
582 .into_iter()
583 .map(|user| User::new(user, http.as_ref())),
584 )
585 .await;
586
587 if let Some(this) = this.upgrade(&cx) {
588 this.update(&mut cx, |this, _| {
589 for user in &users {
590 this.users.insert(user.id, user.clone());
591 }
592 });
593 }
594 Ok(users)
595 } else {
596 Ok(Vec::new())
597 }
598 })
599 }
600}
601
602impl User {
603 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
604 Arc::new(User {
605 id: message.id,
606 github_login: message.github_login,
607 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
608 })
609 }
610}
611
612impl Contact {
613 async fn from_proto(
614 contact: proto::Contact,
615 user_store: &ModelHandle<UserStore>,
616 cx: &mut AsyncAppContext,
617 ) -> Result<Self> {
618 let user = user_store
619 .update(cx, |user_store, cx| {
620 user_store.fetch_user(contact.user_id, cx)
621 })
622 .await?;
623 let mut projects = Vec::new();
624 for project in contact.projects {
625 let mut guests = BTreeSet::new();
626 for participant_id in project.guests {
627 guests.insert(
628 user_store
629 .update(cx, |user_store, cx| {
630 user_store.fetch_user(participant_id, cx)
631 })
632 .await?,
633 );
634 }
635 projects.push(ProjectMetadata {
636 id: project.id,
637 worktree_root_names: project.worktree_root_names.clone(),
638 guests,
639 });
640 }
641 Ok(Self {
642 user,
643 online: contact.online,
644 projects,
645 })
646 }
647
648 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
649 self.projects
650 .iter()
651 .filter(|project| !project.worktree_root_names.is_empty())
652 }
653}
654
655async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
656 let mut response = http
657 .get(url, Default::default(), true)
658 .await
659 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
660
661 if !response.status().is_success() {
662 return Err(anyhow!("avatar request failed {:?}", response.status()));
663 }
664
665 let mut body = Vec::new();
666 response
667 .body_mut()
668 .read_to_end(&mut body)
669 .await
670 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
671 let format = image::guess_format(&body)?;
672 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
673 Ok(ImageData::new(image))
674}