1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
6use postage::{sink::Sink, watch};
7use rpc::proto::{RequestMessage, UsersResponse};
8use std::sync::{Arc, Weak};
9use util::TryFutureExt as _;
10
11#[derive(Default, Debug)]
12pub struct User {
13 pub id: u64,
14 pub github_login: String,
15 pub avatar: Option<Arc<ImageData>>,
16}
17
18impl PartialOrd for User {
19 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
20 Some(self.cmp(other))
21 }
22}
23
24impl Ord for User {
25 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
26 self.github_login.cmp(&other.github_login)
27 }
28}
29
30impl PartialEq for User {
31 fn eq(&self, other: &Self) -> bool {
32 self.id == other.id && self.github_login == other.github_login
33 }
34}
35
36impl Eq for User {}
37
38#[derive(Debug, PartialEq)]
39pub struct Contact {
40 pub user: Arc<User>,
41 pub online: bool,
42 pub busy: bool,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum ContactRequestStatus {
47 None,
48 RequestSent,
49 RequestReceived,
50 RequestAccepted,
51}
52
53pub struct UserStore {
54 users: HashMap<u64, Arc<User>>,
55 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
56 current_user: watch::Receiver<Option<Arc<User>>>,
57 contacts: Vec<Arc<Contact>>,
58 incoming_contact_requests: Vec<Arc<User>>,
59 outgoing_contact_requests: Vec<Arc<User>>,
60 pending_contact_requests: HashMap<u64, usize>,
61 invite_info: Option<InviteInfo>,
62 client: Weak<Client>,
63 http: Arc<dyn HttpClient>,
64 _maintain_contacts: Task<()>,
65 _maintain_current_user: Task<()>,
66}
67
68#[derive(Clone)]
69pub struct InviteInfo {
70 pub count: u32,
71 pub url: Arc<str>,
72}
73
74pub enum Event {
75 Contact {
76 user: Arc<User>,
77 kind: ContactEventKind,
78 },
79 ShowContacts,
80}
81
82#[derive(Clone, Copy)]
83pub enum ContactEventKind {
84 Requested,
85 Accepted,
86 Cancelled,
87}
88
89impl Entity for UserStore {
90 type Event = Event;
91}
92
93enum UpdateContacts {
94 Update(proto::UpdateContacts),
95 Wait(postage::barrier::Sender),
96 Clear(postage::barrier::Sender),
97}
98
99impl UserStore {
100 pub fn new(
101 client: Arc<Client>,
102 http: Arc<dyn HttpClient>,
103 cx: &mut ModelContext<Self>,
104 ) -> Self {
105 let (mut current_user_tx, current_user_rx) = watch::channel();
106 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
107 let rpc_subscriptions = vec![
108 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
109 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
110 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
111 ];
112 Self {
113 users: Default::default(),
114 current_user: current_user_rx,
115 contacts: Default::default(),
116 incoming_contact_requests: Default::default(),
117 outgoing_contact_requests: Default::default(),
118 invite_info: None,
119 client: Arc::downgrade(&client),
120 update_contacts_tx,
121 http,
122 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
123 let _subscriptions = rpc_subscriptions;
124 while let Some(message) = update_contacts_rx.next().await {
125 if let Some(this) = this.upgrade(&cx) {
126 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
127 .log_err()
128 .await;
129 }
130 }
131 }),
132 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
133 let mut status = client.status();
134 while let Some(status) = status.next().await {
135 match status {
136 Status::Connected { .. } => {
137 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
138 let user = this
139 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
140 .log_err()
141 .await;
142 current_user_tx.send(user).await.ok();
143 }
144 }
145 Status::SignedOut => {
146 current_user_tx.send(None).await.ok();
147 if let Some(this) = this.upgrade(&cx) {
148 this.update(&mut cx, |this, _| this.clear_contacts()).await;
149 }
150 }
151 Status::ConnectionLost => {
152 if let Some(this) = this.upgrade(&cx) {
153 this.update(&mut cx, |this, _| this.clear_contacts()).await;
154 }
155 }
156 _ => {}
157 }
158 }
159 }),
160 pending_contact_requests: Default::default(),
161 }
162 }
163
164 async fn handle_update_invite_info(
165 this: ModelHandle<Self>,
166 message: TypedEnvelope<proto::UpdateInviteInfo>,
167 _: Arc<Client>,
168 mut cx: AsyncAppContext,
169 ) -> Result<()> {
170 this.update(&mut cx, |this, cx| {
171 this.invite_info = Some(InviteInfo {
172 url: Arc::from(message.payload.url),
173 count: message.payload.count,
174 });
175 cx.notify();
176 });
177 Ok(())
178 }
179
180 async fn handle_show_contacts(
181 this: ModelHandle<Self>,
182 _: TypedEnvelope<proto::ShowContacts>,
183 _: Arc<Client>,
184 mut cx: AsyncAppContext,
185 ) -> Result<()> {
186 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
187 Ok(())
188 }
189
190 pub fn invite_info(&self) -> Option<&InviteInfo> {
191 self.invite_info.as_ref()
192 }
193
194 async fn handle_update_contacts(
195 this: ModelHandle<Self>,
196 message: TypedEnvelope<proto::UpdateContacts>,
197 _: Arc<Client>,
198 mut cx: AsyncAppContext,
199 ) -> Result<()> {
200 this.update(&mut cx, |this, _| {
201 this.update_contacts_tx
202 .unbounded_send(UpdateContacts::Update(message.payload))
203 .unwrap();
204 });
205 Ok(())
206 }
207
208 fn update_contacts(
209 &mut self,
210 message: UpdateContacts,
211 cx: &mut ModelContext<Self>,
212 ) -> Task<Result<()>> {
213 match message {
214 UpdateContacts::Wait(barrier) => {
215 drop(barrier);
216 Task::ready(Ok(()))
217 }
218 UpdateContacts::Clear(barrier) => {
219 self.contacts.clear();
220 self.incoming_contact_requests.clear();
221 self.outgoing_contact_requests.clear();
222 drop(barrier);
223 Task::ready(Ok(()))
224 }
225 UpdateContacts::Update(message) => {
226 let mut user_ids = HashSet::default();
227 for contact in &message.contacts {
228 user_ids.insert(contact.user_id);
229 }
230 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
231 user_ids.extend(message.outgoing_requests.iter());
232
233 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
234 cx.spawn(|this, mut cx| async move {
235 load_users.await?;
236
237 // Users are fetched in parallel above and cached in call to get_users
238 // No need to paralellize here
239 let mut updated_contacts = Vec::new();
240 for contact in message.contacts {
241 let should_notify = contact.should_notify;
242 updated_contacts.push((
243 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
244 should_notify,
245 ));
246 }
247
248 let mut incoming_requests = Vec::new();
249 for request in message.incoming_requests {
250 incoming_requests.push({
251 let user = this
252 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
253 .await?;
254 (user, request.should_notify)
255 });
256 }
257
258 let mut outgoing_requests = Vec::new();
259 for requested_user_id in message.outgoing_requests {
260 outgoing_requests.push(
261 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
262 .await?,
263 );
264 }
265
266 let removed_contacts =
267 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
268 let removed_incoming_requests =
269 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
270 let removed_outgoing_requests =
271 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
272
273 this.update(&mut cx, |this, cx| {
274 // Remove contacts
275 this.contacts
276 .retain(|contact| !removed_contacts.contains(&contact.user.id));
277 // Update existing contacts and insert new ones
278 for (updated_contact, should_notify) in updated_contacts {
279 if should_notify {
280 cx.emit(Event::Contact {
281 user: updated_contact.user.clone(),
282 kind: ContactEventKind::Accepted,
283 });
284 }
285 match this.contacts.binary_search_by_key(
286 &&updated_contact.user.github_login,
287 |contact| &contact.user.github_login,
288 ) {
289 Ok(ix) => this.contacts[ix] = updated_contact,
290 Err(ix) => this.contacts.insert(ix, updated_contact),
291 }
292 }
293
294 // Remove incoming contact requests
295 this.incoming_contact_requests.retain(|user| {
296 if removed_incoming_requests.contains(&user.id) {
297 cx.emit(Event::Contact {
298 user: user.clone(),
299 kind: ContactEventKind::Cancelled,
300 });
301 false
302 } else {
303 true
304 }
305 });
306 // Update existing incoming requests and insert new ones
307 for (user, should_notify) in incoming_requests {
308 if should_notify {
309 cx.emit(Event::Contact {
310 user: user.clone(),
311 kind: ContactEventKind::Requested,
312 });
313 }
314
315 match this
316 .incoming_contact_requests
317 .binary_search_by_key(&&user.github_login, |contact| {
318 &contact.github_login
319 }) {
320 Ok(ix) => this.incoming_contact_requests[ix] = user,
321 Err(ix) => this.incoming_contact_requests.insert(ix, user),
322 }
323 }
324
325 // Remove outgoing contact requests
326 this.outgoing_contact_requests
327 .retain(|user| !removed_outgoing_requests.contains(&user.id));
328 // Update existing incoming requests and insert new ones
329 for request in outgoing_requests {
330 match this
331 .outgoing_contact_requests
332 .binary_search_by_key(&&request.github_login, |contact| {
333 &contact.github_login
334 }) {
335 Ok(ix) => this.outgoing_contact_requests[ix] = request,
336 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
337 }
338 }
339
340 cx.notify();
341 });
342
343 Ok(())
344 })
345 }
346 }
347 }
348
349 pub fn contacts(&self) -> &[Arc<Contact>] {
350 &self.contacts
351 }
352
353 pub fn has_contact(&self, user: &Arc<User>) -> bool {
354 self.contacts
355 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
356 .is_ok()
357 }
358
359 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
360 &self.incoming_contact_requests
361 }
362
363 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
364 &self.outgoing_contact_requests
365 }
366
367 pub fn is_contact_request_pending(&self, user: &User) -> bool {
368 self.pending_contact_requests.contains_key(&user.id)
369 }
370
371 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
372 if self
373 .contacts
374 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
375 .is_ok()
376 {
377 ContactRequestStatus::RequestAccepted
378 } else if self
379 .outgoing_contact_requests
380 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
381 .is_ok()
382 {
383 ContactRequestStatus::RequestSent
384 } else if self
385 .incoming_contact_requests
386 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
387 .is_ok()
388 {
389 ContactRequestStatus::RequestReceived
390 } else {
391 ContactRequestStatus::None
392 }
393 }
394
395 pub fn request_contact(
396 &mut self,
397 responder_id: u64,
398 cx: &mut ModelContext<Self>,
399 ) -> Task<Result<()>> {
400 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
401 }
402
403 pub fn remove_contact(
404 &mut self,
405 user_id: u64,
406 cx: &mut ModelContext<Self>,
407 ) -> Task<Result<()>> {
408 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
409 }
410
411 pub fn respond_to_contact_request(
412 &mut self,
413 requester_id: u64,
414 accept: bool,
415 cx: &mut ModelContext<Self>,
416 ) -> Task<Result<()>> {
417 self.perform_contact_request(
418 requester_id,
419 proto::RespondToContactRequest {
420 requester_id,
421 response: if accept {
422 proto::ContactRequestResponse::Accept
423 } else {
424 proto::ContactRequestResponse::Decline
425 } as i32,
426 },
427 cx,
428 )
429 }
430
431 pub fn dismiss_contact_request(
432 &mut self,
433 requester_id: u64,
434 cx: &mut ModelContext<Self>,
435 ) -> Task<Result<()>> {
436 let client = self.client.upgrade();
437 cx.spawn_weak(|_, _| async move {
438 client
439 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
440 .request(proto::RespondToContactRequest {
441 requester_id,
442 response: proto::ContactRequestResponse::Dismiss as i32,
443 })
444 .await?;
445 Ok(())
446 })
447 }
448
449 fn perform_contact_request<T: RequestMessage>(
450 &mut self,
451 user_id: u64,
452 request: T,
453 cx: &mut ModelContext<Self>,
454 ) -> Task<Result<()>> {
455 let client = self.client.upgrade();
456 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
457 cx.notify();
458
459 cx.spawn(|this, mut cx| async move {
460 let response = client
461 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
462 .request(request)
463 .await;
464 this.update(&mut cx, |this, cx| {
465 if let Entry::Occupied(mut request_count) =
466 this.pending_contact_requests.entry(user_id)
467 {
468 *request_count.get_mut() -= 1;
469 if *request_count.get() == 0 {
470 request_count.remove();
471 }
472 }
473 cx.notify();
474 });
475 response?;
476 Ok(())
477 })
478 }
479
480 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
481 let (tx, mut rx) = postage::barrier::channel();
482 self.update_contacts_tx
483 .unbounded_send(UpdateContacts::Clear(tx))
484 .unwrap();
485 async move {
486 rx.next().await;
487 }
488 }
489
490 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
491 let (tx, mut rx) = postage::barrier::channel();
492 self.update_contacts_tx
493 .unbounded_send(UpdateContacts::Wait(tx))
494 .unwrap();
495 async move {
496 rx.next().await;
497 }
498 }
499
500 pub fn get_users(
501 &mut self,
502 user_ids: Vec<u64>,
503 cx: &mut ModelContext<Self>,
504 ) -> Task<Result<Vec<Arc<User>>>> {
505 let mut user_ids_to_fetch = user_ids.clone();
506 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
507
508 cx.spawn(|this, mut cx| async move {
509 if !user_ids_to_fetch.is_empty() {
510 this.update(&mut cx, |this, cx| {
511 this.load_users(
512 proto::GetUsers {
513 user_ids: user_ids_to_fetch,
514 },
515 cx,
516 )
517 })
518 .await?;
519 }
520
521 this.read_with(&cx, |this, _| {
522 user_ids
523 .iter()
524 .map(|user_id| {
525 this.users
526 .get(user_id)
527 .cloned()
528 .ok_or_else(|| anyhow!("user {} not found", user_id))
529 })
530 .collect()
531 })
532 })
533 }
534
535 pub fn fuzzy_search_users(
536 &mut self,
537 query: String,
538 cx: &mut ModelContext<Self>,
539 ) -> Task<Result<Vec<Arc<User>>>> {
540 self.load_users(proto::FuzzySearchUsers { query }, cx)
541 }
542
543 pub fn get_user(
544 &mut self,
545 user_id: u64,
546 cx: &mut ModelContext<Self>,
547 ) -> Task<Result<Arc<User>>> {
548 if let Some(user) = self.users.get(&user_id).cloned() {
549 return cx.foreground().spawn(async move { Ok(user) });
550 }
551
552 let load_users = self.get_users(vec![user_id], cx);
553 cx.spawn(|this, mut cx| async move {
554 load_users.await?;
555 this.update(&mut cx, |this, _| {
556 this.users
557 .get(&user_id)
558 .cloned()
559 .ok_or_else(|| anyhow!("server responded with no users"))
560 })
561 })
562 }
563
564 pub fn current_user(&self) -> Option<Arc<User>> {
565 self.current_user.borrow().clone()
566 }
567
568 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
569 self.current_user.clone()
570 }
571
572 fn load_users(
573 &mut self,
574 request: impl RequestMessage<Response = UsersResponse>,
575 cx: &mut ModelContext<Self>,
576 ) -> Task<Result<Vec<Arc<User>>>> {
577 let client = self.client.clone();
578 let http = self.http.clone();
579 cx.spawn_weak(|this, mut cx| async move {
580 if let Some(rpc) = client.upgrade() {
581 let response = rpc.request(request).await.context("error loading users")?;
582 let users = future::join_all(
583 response
584 .users
585 .into_iter()
586 .map(|user| User::new(user, http.as_ref())),
587 )
588 .await;
589
590 if let Some(this) = this.upgrade(&cx) {
591 this.update(&mut cx, |this, _| {
592 for user in &users {
593 this.users.insert(user.id, user.clone());
594 }
595 });
596 }
597 Ok(users)
598 } else {
599 Ok(Vec::new())
600 }
601 })
602 }
603}
604
605impl User {
606 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
607 Arc::new(User {
608 id: message.id,
609 github_login: message.github_login,
610 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
611 })
612 }
613}
614
615impl Contact {
616 async fn from_proto(
617 contact: proto::Contact,
618 user_store: &ModelHandle<UserStore>,
619 cx: &mut AsyncAppContext,
620 ) -> Result<Self> {
621 let user = user_store
622 .update(cx, |user_store, cx| {
623 user_store.get_user(contact.user_id, cx)
624 })
625 .await?;
626 Ok(Self {
627 user,
628 online: contact.online,
629 busy: contact.busy,
630 })
631 }
632}
633
634async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
635 let mut response = http
636 .get(url, Default::default(), true)
637 .await
638 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
639
640 if !response.status().is_success() {
641 return Err(anyhow!("avatar request failed {:?}", response.status()));
642 }
643
644 let mut body = Vec::new();
645 response
646 .body_mut()
647 .read_to_end(&mut body)
648 .await
649 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
650 let format = image::guess_format(&body)?;
651 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
652 Ok(ImageData::new(image))
653}