user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use collections::{hash_map::Entry, HashMap, HashSet};
  4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
  5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  6use postage::{sink::Sink, watch};
  7use rpc::proto::{RequestMessage, UsersResponse};
  8use settings::Settings;
  9use staff_mode::StaffMode;
 10use std::sync::{Arc, Weak};
 11use util::http::HttpClient;
 12use util::TryFutureExt as _;
 13
 14#[derive(Default, Debug)]
 15pub struct User {
 16    pub id: u64,
 17    pub github_login: String,
 18    pub avatar: Option<Arc<ImageData>>,
 19}
 20
 21impl PartialOrd for User {
 22    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 23        Some(self.cmp(other))
 24    }
 25}
 26
 27impl Ord for User {
 28    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 29        self.github_login.cmp(&other.github_login)
 30    }
 31}
 32
 33impl PartialEq for User {
 34    fn eq(&self, other: &Self) -> bool {
 35        self.id == other.id && self.github_login == other.github_login
 36    }
 37}
 38
 39impl Eq for User {}
 40
 41#[derive(Debug, PartialEq)]
 42pub struct Contact {
 43    pub user: Arc<User>,
 44    pub online: bool,
 45    pub busy: bool,
 46}
 47
 48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 49pub enum ContactRequestStatus {
 50    None,
 51    RequestSent,
 52    RequestReceived,
 53    RequestAccepted,
 54}
 55
 56pub struct UserStore {
 57    users: HashMap<u64, Arc<User>>,
 58    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 59    current_user: watch::Receiver<Option<Arc<User>>>,
 60    contacts: Vec<Arc<Contact>>,
 61    incoming_contact_requests: Vec<Arc<User>>,
 62    outgoing_contact_requests: Vec<Arc<User>>,
 63    pending_contact_requests: HashMap<u64, usize>,
 64    invite_info: Option<InviteInfo>,
 65    client: Weak<Client>,
 66    http: Arc<dyn HttpClient>,
 67    _maintain_contacts: Task<()>,
 68    _maintain_current_user: Task<()>,
 69}
 70
 71#[derive(Clone)]
 72pub struct InviteInfo {
 73    pub count: u32,
 74    pub url: Arc<str>,
 75}
 76
 77pub enum Event {
 78    Contact {
 79        user: Arc<User>,
 80        kind: ContactEventKind,
 81    },
 82    ShowContacts,
 83}
 84
 85#[derive(Clone, Copy)]
 86pub enum ContactEventKind {
 87    Requested,
 88    Accepted,
 89    Cancelled,
 90}
 91
 92impl Entity for UserStore {
 93    type Event = Event;
 94}
 95
 96enum UpdateContacts {
 97    Update(proto::UpdateContacts),
 98    Wait(postage::barrier::Sender),
 99    Clear(postage::barrier::Sender),
100}
101
102impl UserStore {
103    pub fn new(
104        client: Arc<Client>,
105        http: Arc<dyn HttpClient>,
106        cx: &mut ModelContext<Self>,
107    ) -> Self {
108        let (mut current_user_tx, current_user_rx) = watch::channel();
109        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
110        let rpc_subscriptions = vec![
111            client.add_message_handler(cx.handle(), Self::handle_update_contacts),
112            client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
113            client.add_message_handler(cx.handle(), Self::handle_show_contacts),
114        ];
115        Self {
116            users: Default::default(),
117            current_user: current_user_rx,
118            contacts: Default::default(),
119            incoming_contact_requests: Default::default(),
120            outgoing_contact_requests: Default::default(),
121            invite_info: None,
122            client: Arc::downgrade(&client),
123            update_contacts_tx,
124            http,
125            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
126                let _subscriptions = rpc_subscriptions;
127                while let Some(message) = update_contacts_rx.next().await {
128                    if let Some(this) = this.upgrade(&cx) {
129                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
130                            .log_err()
131                            .await;
132                    }
133                }
134            }),
135            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
136                let mut status = client.status();
137                while let Some(status) = status.next().await {
138                    match status {
139                        Status::Connected { .. } => {
140                            if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
141                                let fetch_user = this
142                                    .update(&mut cx, |this, cx| this.get_user(user_id, cx))
143                                    .log_err();
144                                let fetch_metrics_id =
145                                    client.request(proto::GetPrivateUserInfo {}).log_err();
146                                let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
147                                client.telemetry.set_authenticated_user_info(
148                                    info.as_ref().map(|info| info.metrics_id.clone()),
149                                    info.as_ref().map(|info| info.staff).unwrap_or(false),
150                                    cx.read(|cx| cx.global::<Settings>().telemetry()),
151                                );
152
153                                cx.update(|cx| {
154                                    cx.update_default_global(|staff_mode: &mut StaffMode, _| {
155                                        if !staff_mode.0 {
156                                            *staff_mode = StaffMode(
157                                                info.as_ref()
158                                                    .map(|info| info.staff)
159                                                    .unwrap_or_default(),
160                                            )
161                                        }
162                                        ()
163                                    });
164                                });
165
166                                current_user_tx.send(user).await.ok();
167                            }
168                        }
169                        Status::SignedOut => {
170                            current_user_tx.send(None).await.ok();
171                            if let Some(this) = this.upgrade(&cx) {
172                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
173                            }
174                        }
175                        Status::ConnectionLost => {
176                            if let Some(this) = this.upgrade(&cx) {
177                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
178                            }
179                        }
180                        _ => {}
181                    }
182                }
183            }),
184            pending_contact_requests: Default::default(),
185        }
186    }
187
188    #[cfg(feature = "test-support")]
189    pub fn clear_cache(&mut self) {
190        self.users.clear();
191    }
192
193    async fn handle_update_invite_info(
194        this: ModelHandle<Self>,
195        message: TypedEnvelope<proto::UpdateInviteInfo>,
196        _: Arc<Client>,
197        mut cx: AsyncAppContext,
198    ) -> Result<()> {
199        this.update(&mut cx, |this, cx| {
200            this.invite_info = Some(InviteInfo {
201                url: Arc::from(message.payload.url),
202                count: message.payload.count,
203            });
204            cx.notify();
205        });
206        Ok(())
207    }
208
209    async fn handle_show_contacts(
210        this: ModelHandle<Self>,
211        _: TypedEnvelope<proto::ShowContacts>,
212        _: Arc<Client>,
213        mut cx: AsyncAppContext,
214    ) -> Result<()> {
215        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
216        Ok(())
217    }
218
219    pub fn invite_info(&self) -> Option<&InviteInfo> {
220        self.invite_info.as_ref()
221    }
222
223    async fn handle_update_contacts(
224        this: ModelHandle<Self>,
225        message: TypedEnvelope<proto::UpdateContacts>,
226        _: Arc<Client>,
227        mut cx: AsyncAppContext,
228    ) -> Result<()> {
229        this.update(&mut cx, |this, _| {
230            this.update_contacts_tx
231                .unbounded_send(UpdateContacts::Update(message.payload))
232                .unwrap();
233        });
234        Ok(())
235    }
236
237    fn update_contacts(
238        &mut self,
239        message: UpdateContacts,
240        cx: &mut ModelContext<Self>,
241    ) -> Task<Result<()>> {
242        match message {
243            UpdateContacts::Wait(barrier) => {
244                drop(barrier);
245                Task::ready(Ok(()))
246            }
247            UpdateContacts::Clear(barrier) => {
248                self.contacts.clear();
249                self.incoming_contact_requests.clear();
250                self.outgoing_contact_requests.clear();
251                drop(barrier);
252                Task::ready(Ok(()))
253            }
254            UpdateContacts::Update(message) => {
255                let mut user_ids = HashSet::default();
256                for contact in &message.contacts {
257                    user_ids.insert(contact.user_id);
258                }
259                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
260                user_ids.extend(message.outgoing_requests.iter());
261
262                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
263                cx.spawn(|this, mut cx| async move {
264                    load_users.await?;
265
266                    // Users are fetched in parallel above and cached in call to get_users
267                    // No need to paralellize here
268                    let mut updated_contacts = Vec::new();
269                    for contact in message.contacts {
270                        let should_notify = contact.should_notify;
271                        updated_contacts.push((
272                            Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
273                            should_notify,
274                        ));
275                    }
276
277                    let mut incoming_requests = Vec::new();
278                    for request in message.incoming_requests {
279                        incoming_requests.push({
280                            let user = this
281                                .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
282                                .await?;
283                            (user, request.should_notify)
284                        });
285                    }
286
287                    let mut outgoing_requests = Vec::new();
288                    for requested_user_id in message.outgoing_requests {
289                        outgoing_requests.push(
290                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
291                                .await?,
292                        );
293                    }
294
295                    let removed_contacts =
296                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
297                    let removed_incoming_requests =
298                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
299                    let removed_outgoing_requests =
300                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
301
302                    this.update(&mut cx, |this, cx| {
303                        // Remove contacts
304                        this.contacts
305                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
306                        // Update existing contacts and insert new ones
307                        for (updated_contact, should_notify) in updated_contacts {
308                            if should_notify {
309                                cx.emit(Event::Contact {
310                                    user: updated_contact.user.clone(),
311                                    kind: ContactEventKind::Accepted,
312                                });
313                            }
314                            match this.contacts.binary_search_by_key(
315                                &&updated_contact.user.github_login,
316                                |contact| &contact.user.github_login,
317                            ) {
318                                Ok(ix) => this.contacts[ix] = updated_contact,
319                                Err(ix) => this.contacts.insert(ix, updated_contact),
320                            }
321                        }
322
323                        // Remove incoming contact requests
324                        this.incoming_contact_requests.retain(|user| {
325                            if removed_incoming_requests.contains(&user.id) {
326                                cx.emit(Event::Contact {
327                                    user: user.clone(),
328                                    kind: ContactEventKind::Cancelled,
329                                });
330                                false
331                            } else {
332                                true
333                            }
334                        });
335                        // Update existing incoming requests and insert new ones
336                        for (user, should_notify) in incoming_requests {
337                            if should_notify {
338                                cx.emit(Event::Contact {
339                                    user: user.clone(),
340                                    kind: ContactEventKind::Requested,
341                                });
342                            }
343
344                            match this
345                                .incoming_contact_requests
346                                .binary_search_by_key(&&user.github_login, |contact| {
347                                    &contact.github_login
348                                }) {
349                                Ok(ix) => this.incoming_contact_requests[ix] = user,
350                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
351                            }
352                        }
353
354                        // Remove outgoing contact requests
355                        this.outgoing_contact_requests
356                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
357                        // Update existing incoming requests and insert new ones
358                        for request in outgoing_requests {
359                            match this
360                                .outgoing_contact_requests
361                                .binary_search_by_key(&&request.github_login, |contact| {
362                                    &contact.github_login
363                                }) {
364                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
365                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
366                            }
367                        }
368
369                        cx.notify();
370                    });
371
372                    Ok(())
373                })
374            }
375        }
376    }
377
378    pub fn contacts(&self) -> &[Arc<Contact>] {
379        &self.contacts
380    }
381
382    pub fn has_contact(&self, user: &Arc<User>) -> bool {
383        self.contacts
384            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
385            .is_ok()
386    }
387
388    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
389        &self.incoming_contact_requests
390    }
391
392    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
393        &self.outgoing_contact_requests
394    }
395
396    pub fn is_contact_request_pending(&self, user: &User) -> bool {
397        self.pending_contact_requests.contains_key(&user.id)
398    }
399
400    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
401        if self
402            .contacts
403            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
404            .is_ok()
405        {
406            ContactRequestStatus::RequestAccepted
407        } else if self
408            .outgoing_contact_requests
409            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
410            .is_ok()
411        {
412            ContactRequestStatus::RequestSent
413        } else if self
414            .incoming_contact_requests
415            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
416            .is_ok()
417        {
418            ContactRequestStatus::RequestReceived
419        } else {
420            ContactRequestStatus::None
421        }
422    }
423
424    pub fn request_contact(
425        &mut self,
426        responder_id: u64,
427        cx: &mut ModelContext<Self>,
428    ) -> Task<Result<()>> {
429        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
430    }
431
432    pub fn remove_contact(
433        &mut self,
434        user_id: u64,
435        cx: &mut ModelContext<Self>,
436    ) -> Task<Result<()>> {
437        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
438    }
439
440    pub fn respond_to_contact_request(
441        &mut self,
442        requester_id: u64,
443        accept: bool,
444        cx: &mut ModelContext<Self>,
445    ) -> Task<Result<()>> {
446        self.perform_contact_request(
447            requester_id,
448            proto::RespondToContactRequest {
449                requester_id,
450                response: if accept {
451                    proto::ContactRequestResponse::Accept
452                } else {
453                    proto::ContactRequestResponse::Decline
454                } as i32,
455            },
456            cx,
457        )
458    }
459
460    pub fn dismiss_contact_request(
461        &mut self,
462        requester_id: u64,
463        cx: &mut ModelContext<Self>,
464    ) -> Task<Result<()>> {
465        let client = self.client.upgrade();
466        cx.spawn_weak(|_, _| async move {
467            client
468                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
469                .request(proto::RespondToContactRequest {
470                    requester_id,
471                    response: proto::ContactRequestResponse::Dismiss as i32,
472                })
473                .await?;
474            Ok(())
475        })
476    }
477
478    fn perform_contact_request<T: RequestMessage>(
479        &mut self,
480        user_id: u64,
481        request: T,
482        cx: &mut ModelContext<Self>,
483    ) -> Task<Result<()>> {
484        let client = self.client.upgrade();
485        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
486        cx.notify();
487
488        cx.spawn(|this, mut cx| async move {
489            let response = client
490                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
491                .request(request)
492                .await;
493            this.update(&mut cx, |this, cx| {
494                if let Entry::Occupied(mut request_count) =
495                    this.pending_contact_requests.entry(user_id)
496                {
497                    *request_count.get_mut() -= 1;
498                    if *request_count.get() == 0 {
499                        request_count.remove();
500                    }
501                }
502                cx.notify();
503            });
504            response?;
505            Ok(())
506        })
507    }
508
509    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
510        let (tx, mut rx) = postage::barrier::channel();
511        self.update_contacts_tx
512            .unbounded_send(UpdateContacts::Clear(tx))
513            .unwrap();
514        async move {
515            rx.next().await;
516        }
517    }
518
519    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
520        let (tx, mut rx) = postage::barrier::channel();
521        self.update_contacts_tx
522            .unbounded_send(UpdateContacts::Wait(tx))
523            .unwrap();
524        async move {
525            rx.next().await;
526        }
527    }
528
529    pub fn get_users(
530        &mut self,
531        user_ids: Vec<u64>,
532        cx: &mut ModelContext<Self>,
533    ) -> Task<Result<Vec<Arc<User>>>> {
534        let mut user_ids_to_fetch = user_ids.clone();
535        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
536
537        cx.spawn(|this, mut cx| async move {
538            if !user_ids_to_fetch.is_empty() {
539                this.update(&mut cx, |this, cx| {
540                    this.load_users(
541                        proto::GetUsers {
542                            user_ids: user_ids_to_fetch,
543                        },
544                        cx,
545                    )
546                })
547                .await?;
548            }
549
550            this.read_with(&cx, |this, _| {
551                user_ids
552                    .iter()
553                    .map(|user_id| {
554                        this.users
555                            .get(user_id)
556                            .cloned()
557                            .ok_or_else(|| anyhow!("user {} not found", user_id))
558                    })
559                    .collect()
560            })
561        })
562    }
563
564    pub fn fuzzy_search_users(
565        &mut self,
566        query: String,
567        cx: &mut ModelContext<Self>,
568    ) -> Task<Result<Vec<Arc<User>>>> {
569        self.load_users(proto::FuzzySearchUsers { query }, cx)
570    }
571
572    pub fn get_user(
573        &mut self,
574        user_id: u64,
575        cx: &mut ModelContext<Self>,
576    ) -> Task<Result<Arc<User>>> {
577        if let Some(user) = self.users.get(&user_id).cloned() {
578            return cx.foreground().spawn(async move { Ok(user) });
579        }
580
581        let load_users = self.get_users(vec![user_id], cx);
582        cx.spawn(|this, mut cx| async move {
583            load_users.await?;
584            this.update(&mut cx, |this, _| {
585                this.users
586                    .get(&user_id)
587                    .cloned()
588                    .ok_or_else(|| anyhow!("server responded with no users"))
589            })
590        })
591    }
592
593    pub fn current_user(&self) -> Option<Arc<User>> {
594        self.current_user.borrow().clone()
595    }
596
597    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
598        self.current_user.clone()
599    }
600
601    fn load_users(
602        &mut self,
603        request: impl RequestMessage<Response = UsersResponse>,
604        cx: &mut ModelContext<Self>,
605    ) -> Task<Result<Vec<Arc<User>>>> {
606        let client = self.client.clone();
607        let http = self.http.clone();
608        cx.spawn_weak(|this, mut cx| async move {
609            if let Some(rpc) = client.upgrade() {
610                let response = rpc.request(request).await.context("error loading users")?;
611                let users = future::join_all(
612                    response
613                        .users
614                        .into_iter()
615                        .map(|user| User::new(user, http.as_ref())),
616                )
617                .await;
618
619                if let Some(this) = this.upgrade(&cx) {
620                    this.update(&mut cx, |this, _| {
621                        for user in &users {
622                            this.users.insert(user.id, user.clone());
623                        }
624                    });
625                }
626                Ok(users)
627            } else {
628                Ok(Vec::new())
629            }
630        })
631    }
632}
633
634impl User {
635    async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
636        Arc::new(User {
637            id: message.id,
638            github_login: message.github_login,
639            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
640        })
641    }
642}
643
644impl Contact {
645    async fn from_proto(
646        contact: proto::Contact,
647        user_store: &ModelHandle<UserStore>,
648        cx: &mut AsyncAppContext,
649    ) -> Result<Self> {
650        let user = user_store
651            .update(cx, |user_store, cx| {
652                user_store.get_user(contact.user_id, cx)
653            })
654            .await?;
655        Ok(Self {
656            user,
657            online: contact.online,
658            busy: contact.busy,
659        })
660    }
661}
662
663async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
664    let mut response = http
665        .get(url, Default::default(), true)
666        .await
667        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
668
669    if !response.status().is_success() {
670        return Err(anyhow!("avatar request failed {:?}", response.status()));
671    }
672
673    let mut body = Vec::new();
674    response
675        .body_mut()
676        .read_to_end(&mut body)
677        .await
678        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
679    let format = image::guess_format(&body)?;
680    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
681    Ok(ImageData::new(image))
682}