1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{prelude::Stream, sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use std::sync::{Arc, Weak};
9use util::TryFutureExt as _;
10
11#[derive(Debug)]
12pub struct User {
13 pub id: u64,
14 pub github_login: String,
15 pub avatar: Option<Arc<ImageData>>,
16}
17
18impl PartialOrd for User {
19 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
20 Some(self.cmp(&other))
21 }
22}
23
24impl Ord for User {
25 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
26 self.github_login.cmp(&other.github_login)
27 }
28}
29
30impl PartialEq for User {
31 fn eq(&self, other: &Self) -> bool {
32 self.id == other.id && self.github_login == other.github_login
33 }
34}
35
36impl Eq for User {}
37
38#[derive(Debug)]
39pub struct Contact {
40 pub user: Arc<User>,
41 pub online: bool,
42 pub projects: Vec<ProjectMetadata>,
43}
44
45#[derive(Debug)]
46pub struct ProjectMetadata {
47 pub id: u64,
48 pub worktree_root_names: Vec<String>,
49 pub guests: BTreeSet<Arc<User>>,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum ContactRequestStatus {
54 None,
55 RequestSent,
56 RequestReceived,
57 RequestAccepted,
58}
59
60pub struct UserStore {
61 users: HashMap<u64, Arc<User>>,
62 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
63 current_user: watch::Receiver<Option<Arc<User>>>,
64 contacts: Vec<Arc<Contact>>,
65 incoming_contact_requests: Vec<Arc<User>>,
66 outgoing_contact_requests: Vec<Arc<User>>,
67 pending_contact_requests: HashMap<u64, usize>,
68 client: Weak<Client>,
69 http: Arc<dyn HttpClient>,
70 _maintain_contacts: Task<()>,
71 _maintain_current_user: Task<()>,
72}
73
74#[derive(Clone)]
75pub struct ContactEvent {
76 pub user: Arc<User>,
77 pub kind: ContactEventKind,
78}
79
80#[derive(Clone, Copy)]
81pub enum ContactEventKind {
82 Requested,
83 Accepted,
84 Cancelled,
85}
86
87impl Entity for UserStore {
88 type Event = ContactEvent;
89}
90
91enum UpdateContacts {
92 Update(proto::UpdateContacts),
93 Clear(postage::barrier::Sender),
94}
95
96impl UserStore {
97 pub fn new(
98 client: Arc<Client>,
99 http: Arc<dyn HttpClient>,
100 cx: &mut ModelContext<Self>,
101 ) -> Self {
102 let (mut current_user_tx, current_user_rx) = watch::channel();
103 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
104 let rpc_subscription =
105 client.add_message_handler(cx.handle(), Self::handle_update_contacts);
106 Self {
107 users: Default::default(),
108 current_user: current_user_rx,
109 contacts: Default::default(),
110 incoming_contact_requests: Default::default(),
111 outgoing_contact_requests: Default::default(),
112 client: Arc::downgrade(&client),
113 update_contacts_tx,
114 http,
115 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
116 let _subscription = rpc_subscription;
117 while let Some(message) = update_contacts_rx.next().await {
118 if let Some(this) = this.upgrade(&cx) {
119 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
120 .log_err()
121 .await;
122 }
123 }
124 }),
125 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
126 let mut status = client.status();
127 while let Some(status) = status.recv().await {
128 match status {
129 Status::Connected { .. } => {
130 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
131 let user = this
132 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
133 .log_err()
134 .await;
135 current_user_tx.send(user).await.ok();
136 }
137 }
138 Status::SignedOut => {
139 current_user_tx.send(None).await.ok();
140 if let Some(this) = this.upgrade(&cx) {
141 this.update(&mut cx, |this, _| this.clear_contacts()).await;
142 }
143 }
144 Status::ConnectionLost => {
145 if let Some(this) = this.upgrade(&cx) {
146 this.update(&mut cx, |this, _| this.clear_contacts()).await;
147 }
148 }
149 _ => {}
150 }
151 }
152 }),
153 pending_contact_requests: Default::default(),
154 }
155 }
156
157 async fn handle_update_contacts(
158 this: ModelHandle<Self>,
159 msg: TypedEnvelope<proto::UpdateContacts>,
160 _: Arc<Client>,
161 mut cx: AsyncAppContext,
162 ) -> Result<()> {
163 this.update(&mut cx, |this, _| {
164 this.update_contacts_tx
165 .unbounded_send(UpdateContacts::Update(msg.payload))
166 .unwrap();
167 });
168 Ok(())
169 }
170
171 fn update_contacts(
172 &mut self,
173 message: UpdateContacts,
174 cx: &mut ModelContext<Self>,
175 ) -> Task<Result<()>> {
176 match message {
177 UpdateContacts::Clear(barrier) => {
178 self.contacts.clear();
179 self.incoming_contact_requests.clear();
180 self.outgoing_contact_requests.clear();
181 drop(barrier);
182 Task::ready(Ok(()))
183 }
184 UpdateContacts::Update(message) => {
185 log::info!(
186 "update contacts on client {}: {:?}",
187 self.client.upgrade().unwrap().id,
188 message
189 );
190 let mut user_ids = HashSet::default();
191 for contact in &message.contacts {
192 user_ids.insert(contact.user_id);
193 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
194 }
195 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
196 user_ids.extend(message.outgoing_requests.iter());
197
198 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
199 cx.spawn(|this, mut cx| async move {
200 load_users.await?;
201
202 // Users are fetched in parallel above and cached in call to get_users
203 // No need to paralellize here
204 let mut updated_contacts = Vec::new();
205 for contact in message.contacts {
206 let should_notify = contact.should_notify;
207 updated_contacts.push((
208 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
209 should_notify,
210 ));
211 }
212
213 let mut incoming_requests = Vec::new();
214 for request in message.incoming_requests {
215 incoming_requests.push({
216 let user = this
217 .update(&mut cx, |this, cx| {
218 this.fetch_user(request.requester_id, cx)
219 })
220 .await?;
221 (user, request.should_notify)
222 });
223 }
224
225 let mut outgoing_requests = Vec::new();
226 for requested_user_id in message.outgoing_requests {
227 outgoing_requests.push(
228 this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
229 .await?,
230 );
231 }
232
233 let removed_contacts =
234 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
235 let removed_incoming_requests =
236 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
237 let removed_outgoing_requests =
238 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
239
240 this.update(&mut cx, |this, cx| {
241 // Remove contacts
242 this.contacts
243 .retain(|contact| !removed_contacts.contains(&contact.user.id));
244 // Update existing contacts and insert new ones
245 for (updated_contact, should_notify) in updated_contacts {
246 if should_notify {
247 cx.emit(ContactEvent {
248 user: updated_contact.user.clone(),
249 kind: ContactEventKind::Accepted,
250 });
251 }
252 match this.contacts.binary_search_by_key(
253 &&updated_contact.user.github_login,
254 |contact| &contact.user.github_login,
255 ) {
256 Ok(ix) => this.contacts[ix] = updated_contact,
257 Err(ix) => this.contacts.insert(ix, updated_contact),
258 }
259 }
260
261 // Remove incoming contact requests
262 this.incoming_contact_requests.retain(|user| {
263 if removed_incoming_requests.contains(&user.id) {
264 cx.emit(ContactEvent {
265 user: user.clone(),
266 kind: ContactEventKind::Cancelled,
267 });
268 false
269 } else {
270 true
271 }
272 });
273 // Update existing incoming requests and insert new ones
274 for (user, should_notify) in incoming_requests {
275 if should_notify {
276 cx.emit(ContactEvent {
277 user: user.clone(),
278 kind: ContactEventKind::Requested,
279 });
280 }
281
282 match this
283 .incoming_contact_requests
284 .binary_search_by_key(&&user.github_login, |contact| {
285 &contact.github_login
286 }) {
287 Ok(ix) => this.incoming_contact_requests[ix] = user,
288 Err(ix) => this.incoming_contact_requests.insert(ix, user),
289 }
290 }
291
292 // Remove outgoing contact requests
293 this.outgoing_contact_requests
294 .retain(|user| !removed_outgoing_requests.contains(&user.id));
295 // Update existing incoming requests and insert new ones
296 for request in outgoing_requests {
297 match this
298 .outgoing_contact_requests
299 .binary_search_by_key(&&request.github_login, |contact| {
300 &contact.github_login
301 }) {
302 Ok(ix) => this.outgoing_contact_requests[ix] = request,
303 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
304 }
305 }
306
307 cx.notify();
308 });
309
310 Ok(())
311 })
312 }
313 }
314 }
315
316 pub fn contacts(&self) -> &[Arc<Contact>] {
317 &self.contacts
318 }
319
320 pub fn has_contact(&self, user: &Arc<User>) -> bool {
321 self.contacts
322 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
323 .is_ok()
324 }
325
326 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
327 &self.incoming_contact_requests
328 }
329
330 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
331 &self.outgoing_contact_requests
332 }
333
334 pub fn is_contact_request_pending(&self, user: &User) -> bool {
335 self.pending_contact_requests.contains_key(&user.id)
336 }
337
338 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
339 if self
340 .contacts
341 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
342 .is_ok()
343 {
344 ContactRequestStatus::RequestAccepted
345 } else if self
346 .outgoing_contact_requests
347 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
348 .is_ok()
349 {
350 ContactRequestStatus::RequestSent
351 } else if self
352 .incoming_contact_requests
353 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
354 .is_ok()
355 {
356 ContactRequestStatus::RequestReceived
357 } else {
358 ContactRequestStatus::None
359 }
360 }
361
362 pub fn request_contact(
363 &mut self,
364 responder_id: u64,
365 cx: &mut ModelContext<Self>,
366 ) -> Task<Result<()>> {
367 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
368 }
369
370 pub fn remove_contact(
371 &mut self,
372 user_id: u64,
373 cx: &mut ModelContext<Self>,
374 ) -> Task<Result<()>> {
375 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
376 }
377
378 pub fn respond_to_contact_request(
379 &mut self,
380 requester_id: u64,
381 accept: bool,
382 cx: &mut ModelContext<Self>,
383 ) -> Task<Result<()>> {
384 self.perform_contact_request(
385 requester_id,
386 proto::RespondToContactRequest {
387 requester_id,
388 response: if accept {
389 proto::ContactRequestResponse::Accept
390 } else {
391 proto::ContactRequestResponse::Decline
392 } as i32,
393 },
394 cx,
395 )
396 }
397
398 pub fn dismiss_contact_request(
399 &mut self,
400 requester_id: u64,
401 cx: &mut ModelContext<Self>,
402 ) -> Task<Result<()>> {
403 let client = self.client.upgrade();
404 cx.spawn_weak(|_, _| async move {
405 client
406 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
407 .request(proto::RespondToContactRequest {
408 requester_id,
409 response: proto::ContactRequestResponse::Dismiss as i32,
410 })
411 .await?;
412 Ok(())
413 })
414 }
415
416 fn perform_contact_request<T: RequestMessage>(
417 &mut self,
418 user_id: u64,
419 request: T,
420 cx: &mut ModelContext<Self>,
421 ) -> Task<Result<()>> {
422 let client = self.client.upgrade();
423 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
424 cx.notify();
425
426 cx.spawn(|this, mut cx| async move {
427 let response = client
428 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
429 .request(request)
430 .await;
431 this.update(&mut cx, |this, cx| {
432 if let Entry::Occupied(mut request_count) =
433 this.pending_contact_requests.entry(user_id)
434 {
435 *request_count.get_mut() -= 1;
436 if *request_count.get() == 0 {
437 request_count.remove();
438 }
439 }
440 cx.notify();
441 });
442 response?;
443 Ok(())
444 })
445 }
446
447 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
448 let (tx, mut rx) = postage::barrier::channel();
449 self.update_contacts_tx
450 .unbounded_send(UpdateContacts::Clear(tx))
451 .unwrap();
452 async move {
453 rx.recv().await;
454 }
455 }
456
457 pub fn get_users(
458 &mut self,
459 mut user_ids: Vec<u64>,
460 cx: &mut ModelContext<Self>,
461 ) -> Task<Result<()>> {
462 user_ids.retain(|id| !self.users.contains_key(id));
463 if user_ids.is_empty() {
464 Task::ready(Ok(()))
465 } else {
466 let load = self.load_users(proto::GetUsers { user_ids }, cx);
467 cx.foreground().spawn(async move {
468 load.await?;
469 Ok(())
470 })
471 }
472 }
473
474 pub fn fuzzy_search_users(
475 &mut self,
476 query: String,
477 cx: &mut ModelContext<Self>,
478 ) -> Task<Result<Vec<Arc<User>>>> {
479 self.load_users(proto::FuzzySearchUsers { query }, cx)
480 }
481
482 pub fn fetch_user(
483 &mut self,
484 user_id: u64,
485 cx: &mut ModelContext<Self>,
486 ) -> Task<Result<Arc<User>>> {
487 if let Some(user) = self.users.get(&user_id).cloned() {
488 return cx.foreground().spawn(async move { Ok(user) });
489 }
490
491 let load_users = self.get_users(vec![user_id], cx);
492 cx.spawn(|this, mut cx| async move {
493 load_users.await?;
494 this.update(&mut cx, |this, _| {
495 this.users
496 .get(&user_id)
497 .cloned()
498 .ok_or_else(|| anyhow!("server responded with no users"))
499 })
500 })
501 }
502
503 pub fn current_user(&self) -> Option<Arc<User>> {
504 self.current_user.borrow().clone()
505 }
506
507 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
508 self.current_user.clone()
509 }
510
511 fn load_users(
512 &mut self,
513 request: impl RequestMessage<Response = UsersResponse>,
514 cx: &mut ModelContext<Self>,
515 ) -> Task<Result<Vec<Arc<User>>>> {
516 let client = self.client.clone();
517 let http = self.http.clone();
518 cx.spawn_weak(|this, mut cx| async move {
519 if let Some(rpc) = client.upgrade() {
520 let response = rpc.request(request).await.context("error loading users")?;
521 let users = future::join_all(
522 response
523 .users
524 .into_iter()
525 .map(|user| User::new(user, http.as_ref())),
526 )
527 .await;
528
529 if let Some(this) = this.upgrade(&cx) {
530 this.update(&mut cx, |this, _| {
531 for user in &users {
532 this.users.insert(user.id, user.clone());
533 }
534 });
535 }
536 Ok(users)
537 } else {
538 Ok(Vec::new())
539 }
540 })
541 }
542}
543
544impl User {
545 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
546 Arc::new(User {
547 id: message.id,
548 github_login: message.github_login,
549 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
550 })
551 }
552}
553
554impl Contact {
555 async fn from_proto(
556 contact: proto::Contact,
557 user_store: &ModelHandle<UserStore>,
558 cx: &mut AsyncAppContext,
559 ) -> Result<Self> {
560 let user = user_store
561 .update(cx, |user_store, cx| {
562 user_store.fetch_user(contact.user_id, cx)
563 })
564 .await?;
565 let mut projects = Vec::new();
566 for project in contact.projects {
567 let mut guests = BTreeSet::new();
568 for participant_id in project.guests {
569 guests.insert(
570 user_store
571 .update(cx, |user_store, cx| {
572 user_store.fetch_user(participant_id, cx)
573 })
574 .await?,
575 );
576 }
577 projects.push(ProjectMetadata {
578 id: project.id,
579 worktree_root_names: project.worktree_root_names.clone(),
580 guests,
581 });
582 }
583 Ok(Self {
584 user,
585 online: contact.online,
586 projects,
587 })
588 }
589
590 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
591 self.projects
592 .iter()
593 .filter(|project| !project.worktree_root_names.is_empty())
594 }
595}
596
597async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
598 let mut response = http
599 .get(url, Default::default(), true)
600 .await
601 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
602
603 if !response.status().is_success() {
604 return Err(anyhow!("avatar request failed {:?}", response.status()));
605 }
606
607 let mut body = Vec::new();
608 response
609 .body_mut()
610 .read_to_end(&mut body)
611 .await
612 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
613 let format = image::guess_format(&body)?;
614 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
615 Ok(ImageData::new(image))
616}