1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{prelude::Stream, sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use std::sync::{Arc, Weak};
9use util::TryFutureExt as _;
10
11#[derive(Debug)]
12pub struct User {
13 pub id: u64,
14 pub github_login: String,
15 pub avatar: Option<Arc<ImageData>>,
16}
17
18impl PartialOrd for User {
19 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
20 Some(self.cmp(&other))
21 }
22}
23
24impl Ord for User {
25 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
26 self.github_login.cmp(&other.github_login)
27 }
28}
29
30impl PartialEq for User {
31 fn eq(&self, other: &Self) -> bool {
32 self.id == other.id && self.github_login == other.github_login
33 }
34}
35
36impl Eq for User {}
37
38#[derive(Debug)]
39pub struct Contact {
40 pub user: Arc<User>,
41 pub online: bool,
42 pub projects: Vec<ProjectMetadata>,
43}
44
45#[derive(Debug)]
46pub struct ProjectMetadata {
47 pub id: u64,
48 pub worktree_root_names: Vec<String>,
49 pub guests: BTreeSet<Arc<User>>,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum ContactRequestStatus {
54 None,
55 RequestSent,
56 RequestReceived,
57 RequestAccepted,
58}
59
60pub struct UserStore {
61 users: HashMap<u64, Arc<User>>,
62 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
63 current_user: watch::Receiver<Option<Arc<User>>>,
64 contacts: Vec<Arc<Contact>>,
65 incoming_contact_requests: Vec<Arc<User>>,
66 outgoing_contact_requests: Vec<Arc<User>>,
67 pending_contact_requests: HashMap<u64, usize>,
68 invite_info: Option<InviteInfo>,
69 client: Weak<Client>,
70 http: Arc<dyn HttpClient>,
71 _maintain_contacts: Task<()>,
72 _maintain_current_user: Task<()>,
73}
74
75#[derive(Clone)]
76pub struct InviteInfo {
77 pub count: u32,
78 pub url: Arc<str>,
79}
80
81#[derive(Clone)]
82pub struct ContactEvent {
83 pub user: Arc<User>,
84 pub kind: ContactEventKind,
85}
86
87#[derive(Clone, Copy)]
88pub enum ContactEventKind {
89 Requested,
90 Accepted,
91 Cancelled,
92}
93
94impl Entity for UserStore {
95 type Event = ContactEvent;
96}
97
98enum UpdateContacts {
99 Update(proto::UpdateContacts),
100 Clear(postage::barrier::Sender),
101}
102
103impl UserStore {
104 pub fn new(
105 client: Arc<Client>,
106 http: Arc<dyn HttpClient>,
107 cx: &mut ModelContext<Self>,
108 ) -> Self {
109 let (mut current_user_tx, current_user_rx) = watch::channel();
110 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
111 let rpc_subscriptions = vec![
112 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
113 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
114 ];
115 Self {
116 users: Default::default(),
117 current_user: current_user_rx,
118 contacts: Default::default(),
119 incoming_contact_requests: Default::default(),
120 outgoing_contact_requests: Default::default(),
121 invite_info: None,
122 client: Arc::downgrade(&client),
123 update_contacts_tx,
124 http,
125 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
126 let _subscriptions = rpc_subscriptions;
127 while let Some(message) = update_contacts_rx.next().await {
128 if let Some(this) = this.upgrade(&cx) {
129 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
130 .log_err()
131 .await;
132 }
133 }
134 }),
135 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
136 let mut status = client.status();
137 while let Some(status) = status.recv().await {
138 match status {
139 Status::Connected { .. } => {
140 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
141 let user = this
142 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
143 .log_err()
144 .await;
145 current_user_tx.send(user).await.ok();
146 }
147 }
148 Status::SignedOut => {
149 current_user_tx.send(None).await.ok();
150 if let Some(this) = this.upgrade(&cx) {
151 this.update(&mut cx, |this, _| this.clear_contacts()).await;
152 }
153 }
154 Status::ConnectionLost => {
155 if let Some(this) = this.upgrade(&cx) {
156 this.update(&mut cx, |this, _| this.clear_contacts()).await;
157 }
158 }
159 _ => {}
160 }
161 }
162 }),
163 pending_contact_requests: Default::default(),
164 }
165 }
166
167 async fn handle_update_invite_info(
168 this: ModelHandle<Self>,
169 message: TypedEnvelope<proto::UpdateInviteInfo>,
170 _: Arc<Client>,
171 mut cx: AsyncAppContext,
172 ) -> Result<()> {
173 this.update(&mut cx, |this, cx| {
174 this.invite_info = Some(InviteInfo {
175 url: Arc::from(message.payload.url),
176 count: message.payload.count,
177 });
178 cx.notify();
179 });
180 Ok(())
181 }
182
183 pub fn invite_info(&self) -> Option<&InviteInfo> {
184 self.invite_info.as_ref()
185 }
186
187 async fn handle_update_contacts(
188 this: ModelHandle<Self>,
189 message: TypedEnvelope<proto::UpdateContacts>,
190 _: Arc<Client>,
191 mut cx: AsyncAppContext,
192 ) -> Result<()> {
193 this.update(&mut cx, |this, _| {
194 this.update_contacts_tx
195 .unbounded_send(UpdateContacts::Update(message.payload))
196 .unwrap();
197 });
198 Ok(())
199 }
200
201 fn update_contacts(
202 &mut self,
203 message: UpdateContacts,
204 cx: &mut ModelContext<Self>,
205 ) -> Task<Result<()>> {
206 match message {
207 UpdateContacts::Clear(barrier) => {
208 self.contacts.clear();
209 self.incoming_contact_requests.clear();
210 self.outgoing_contact_requests.clear();
211 drop(barrier);
212 Task::ready(Ok(()))
213 }
214 UpdateContacts::Update(message) => {
215 log::info!(
216 "update contacts on client {}: {:?}",
217 self.client.upgrade().unwrap().id,
218 message
219 );
220 let mut user_ids = HashSet::default();
221 for contact in &message.contacts {
222 user_ids.insert(contact.user_id);
223 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
224 }
225 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
226 user_ids.extend(message.outgoing_requests.iter());
227
228 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
229 cx.spawn(|this, mut cx| async move {
230 load_users.await?;
231
232 // Users are fetched in parallel above and cached in call to get_users
233 // No need to paralellize here
234 let mut updated_contacts = Vec::new();
235 for contact in message.contacts {
236 let should_notify = contact.should_notify;
237 updated_contacts.push((
238 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
239 should_notify,
240 ));
241 }
242
243 let mut incoming_requests = Vec::new();
244 for request in message.incoming_requests {
245 incoming_requests.push({
246 let user = this
247 .update(&mut cx, |this, cx| {
248 this.fetch_user(request.requester_id, cx)
249 })
250 .await?;
251 (user, request.should_notify)
252 });
253 }
254
255 let mut outgoing_requests = Vec::new();
256 for requested_user_id in message.outgoing_requests {
257 outgoing_requests.push(
258 this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
259 .await?,
260 );
261 }
262
263 let removed_contacts =
264 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
265 let removed_incoming_requests =
266 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
267 let removed_outgoing_requests =
268 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
269
270 this.update(&mut cx, |this, cx| {
271 // Remove contacts
272 this.contacts
273 .retain(|contact| !removed_contacts.contains(&contact.user.id));
274 // Update existing contacts and insert new ones
275 for (updated_contact, should_notify) in updated_contacts {
276 if should_notify {
277 cx.emit(ContactEvent {
278 user: updated_contact.user.clone(),
279 kind: ContactEventKind::Accepted,
280 });
281 }
282 match this.contacts.binary_search_by_key(
283 &&updated_contact.user.github_login,
284 |contact| &contact.user.github_login,
285 ) {
286 Ok(ix) => this.contacts[ix] = updated_contact,
287 Err(ix) => this.contacts.insert(ix, updated_contact),
288 }
289 }
290
291 // Remove incoming contact requests
292 this.incoming_contact_requests.retain(|user| {
293 if removed_incoming_requests.contains(&user.id) {
294 cx.emit(ContactEvent {
295 user: user.clone(),
296 kind: ContactEventKind::Cancelled,
297 });
298 false
299 } else {
300 true
301 }
302 });
303 // Update existing incoming requests and insert new ones
304 for (user, should_notify) in incoming_requests {
305 if should_notify {
306 cx.emit(ContactEvent {
307 user: user.clone(),
308 kind: ContactEventKind::Requested,
309 });
310 }
311
312 match this
313 .incoming_contact_requests
314 .binary_search_by_key(&&user.github_login, |contact| {
315 &contact.github_login
316 }) {
317 Ok(ix) => this.incoming_contact_requests[ix] = user,
318 Err(ix) => this.incoming_contact_requests.insert(ix, user),
319 }
320 }
321
322 // Remove outgoing contact requests
323 this.outgoing_contact_requests
324 .retain(|user| !removed_outgoing_requests.contains(&user.id));
325 // Update existing incoming requests and insert new ones
326 for request in outgoing_requests {
327 match this
328 .outgoing_contact_requests
329 .binary_search_by_key(&&request.github_login, |contact| {
330 &contact.github_login
331 }) {
332 Ok(ix) => this.outgoing_contact_requests[ix] = request,
333 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
334 }
335 }
336
337 cx.notify();
338 });
339
340 Ok(())
341 })
342 }
343 }
344 }
345
346 pub fn contacts(&self) -> &[Arc<Contact>] {
347 &self.contacts
348 }
349
350 pub fn has_contact(&self, user: &Arc<User>) -> bool {
351 self.contacts
352 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
353 .is_ok()
354 }
355
356 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
357 &self.incoming_contact_requests
358 }
359
360 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
361 &self.outgoing_contact_requests
362 }
363
364 pub fn is_contact_request_pending(&self, user: &User) -> bool {
365 self.pending_contact_requests.contains_key(&user.id)
366 }
367
368 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
369 if self
370 .contacts
371 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
372 .is_ok()
373 {
374 ContactRequestStatus::RequestAccepted
375 } else if self
376 .outgoing_contact_requests
377 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
378 .is_ok()
379 {
380 ContactRequestStatus::RequestSent
381 } else if self
382 .incoming_contact_requests
383 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
384 .is_ok()
385 {
386 ContactRequestStatus::RequestReceived
387 } else {
388 ContactRequestStatus::None
389 }
390 }
391
392 pub fn request_contact(
393 &mut self,
394 responder_id: u64,
395 cx: &mut ModelContext<Self>,
396 ) -> Task<Result<()>> {
397 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
398 }
399
400 pub fn remove_contact(
401 &mut self,
402 user_id: u64,
403 cx: &mut ModelContext<Self>,
404 ) -> Task<Result<()>> {
405 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
406 }
407
408 pub fn respond_to_contact_request(
409 &mut self,
410 requester_id: u64,
411 accept: bool,
412 cx: &mut ModelContext<Self>,
413 ) -> Task<Result<()>> {
414 self.perform_contact_request(
415 requester_id,
416 proto::RespondToContactRequest {
417 requester_id,
418 response: if accept {
419 proto::ContactRequestResponse::Accept
420 } else {
421 proto::ContactRequestResponse::Decline
422 } as i32,
423 },
424 cx,
425 )
426 }
427
428 pub fn dismiss_contact_request(
429 &mut self,
430 requester_id: u64,
431 cx: &mut ModelContext<Self>,
432 ) -> Task<Result<()>> {
433 let client = self.client.upgrade();
434 cx.spawn_weak(|_, _| async move {
435 client
436 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
437 .request(proto::RespondToContactRequest {
438 requester_id,
439 response: proto::ContactRequestResponse::Dismiss as i32,
440 })
441 .await?;
442 Ok(())
443 })
444 }
445
446 fn perform_contact_request<T: RequestMessage>(
447 &mut self,
448 user_id: u64,
449 request: T,
450 cx: &mut ModelContext<Self>,
451 ) -> Task<Result<()>> {
452 let client = self.client.upgrade();
453 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
454 cx.notify();
455
456 cx.spawn(|this, mut cx| async move {
457 let response = client
458 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
459 .request(request)
460 .await;
461 this.update(&mut cx, |this, cx| {
462 if let Entry::Occupied(mut request_count) =
463 this.pending_contact_requests.entry(user_id)
464 {
465 *request_count.get_mut() -= 1;
466 if *request_count.get() == 0 {
467 request_count.remove();
468 }
469 }
470 cx.notify();
471 });
472 response?;
473 Ok(())
474 })
475 }
476
477 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
478 let (tx, mut rx) = postage::barrier::channel();
479 self.update_contacts_tx
480 .unbounded_send(UpdateContacts::Clear(tx))
481 .unwrap();
482 async move {
483 rx.recv().await;
484 }
485 }
486
487 pub fn get_users(
488 &mut self,
489 mut user_ids: Vec<u64>,
490 cx: &mut ModelContext<Self>,
491 ) -> Task<Result<()>> {
492 user_ids.retain(|id| !self.users.contains_key(id));
493 if user_ids.is_empty() {
494 Task::ready(Ok(()))
495 } else {
496 let load = self.load_users(proto::GetUsers { user_ids }, cx);
497 cx.foreground().spawn(async move {
498 load.await?;
499 Ok(())
500 })
501 }
502 }
503
504 pub fn fuzzy_search_users(
505 &mut self,
506 query: String,
507 cx: &mut ModelContext<Self>,
508 ) -> Task<Result<Vec<Arc<User>>>> {
509 self.load_users(proto::FuzzySearchUsers { query }, cx)
510 }
511
512 pub fn fetch_user(
513 &mut self,
514 user_id: u64,
515 cx: &mut ModelContext<Self>,
516 ) -> Task<Result<Arc<User>>> {
517 if let Some(user) = self.users.get(&user_id).cloned() {
518 return cx.foreground().spawn(async move { Ok(user) });
519 }
520
521 let load_users = self.get_users(vec![user_id], cx);
522 cx.spawn(|this, mut cx| async move {
523 load_users.await?;
524 this.update(&mut cx, |this, _| {
525 this.users
526 .get(&user_id)
527 .cloned()
528 .ok_or_else(|| anyhow!("server responded with no users"))
529 })
530 })
531 }
532
533 pub fn current_user(&self) -> Option<Arc<User>> {
534 self.current_user.borrow().clone()
535 }
536
537 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
538 self.current_user.clone()
539 }
540
541 fn load_users(
542 &mut self,
543 request: impl RequestMessage<Response = UsersResponse>,
544 cx: &mut ModelContext<Self>,
545 ) -> Task<Result<Vec<Arc<User>>>> {
546 let client = self.client.clone();
547 let http = self.http.clone();
548 cx.spawn_weak(|this, mut cx| async move {
549 if let Some(rpc) = client.upgrade() {
550 let response = rpc.request(request).await.context("error loading users")?;
551 let users = future::join_all(
552 response
553 .users
554 .into_iter()
555 .map(|user| User::new(user, http.as_ref())),
556 )
557 .await;
558
559 if let Some(this) = this.upgrade(&cx) {
560 this.update(&mut cx, |this, _| {
561 for user in &users {
562 this.users.insert(user.id, user.clone());
563 }
564 });
565 }
566 Ok(users)
567 } else {
568 Ok(Vec::new())
569 }
570 })
571 }
572}
573
574impl User {
575 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
576 Arc::new(User {
577 id: message.id,
578 github_login: message.github_login,
579 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
580 })
581 }
582}
583
584impl Contact {
585 async fn from_proto(
586 contact: proto::Contact,
587 user_store: &ModelHandle<UserStore>,
588 cx: &mut AsyncAppContext,
589 ) -> Result<Self> {
590 let user = user_store
591 .update(cx, |user_store, cx| {
592 user_store.fetch_user(contact.user_id, cx)
593 })
594 .await?;
595 let mut projects = Vec::new();
596 for project in contact.projects {
597 let mut guests = BTreeSet::new();
598 for participant_id in project.guests {
599 guests.insert(
600 user_store
601 .update(cx, |user_store, cx| {
602 user_store.fetch_user(participant_id, cx)
603 })
604 .await?,
605 );
606 }
607 projects.push(ProjectMetadata {
608 id: project.id,
609 worktree_root_names: project.worktree_root_names.clone(),
610 guests,
611 });
612 }
613 Ok(Self {
614 user,
615 online: contact.online,
616 projects,
617 })
618 }
619
620 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
621 self.projects
622 .iter()
623 .filter(|project| !project.worktree_root_names.is_empty())
624 }
625}
626
627async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
628 let mut response = http
629 .get(url, Default::default(), true)
630 .await
631 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
632
633 if !response.status().is_success() {
634 return Err(anyhow!("avatar request failed {:?}", response.status()));
635 }
636
637 let mut body = Vec::new();
638 response
639 .body_mut()
640 .read_to_end(&mut body)
641 .await
642 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
643 let format = image::guess_format(&body)?;
644 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
645 Ok(ImageData::new(image))
646}