user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use collections::{hash_map::Entry, HashMap, HashSet};
  4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
  5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  6use postage::{sink::Sink, watch};
  7use rpc::proto::{RequestMessage, UsersResponse};
  8use staff_mode::StaffMode;
  9use std::sync::{Arc, Weak};
 10use util::http::HttpClient;
 11use util::TryFutureExt as _;
 12
 13pub type UserId = u64;
 14
 15#[derive(Default, Debug)]
 16pub struct User {
 17    pub id: UserId,
 18    pub github_login: String,
 19    pub avatar: Option<Arc<ImageData>>,
 20}
 21
 22impl PartialOrd for User {
 23    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 24        Some(self.cmp(other))
 25    }
 26}
 27
 28impl Ord for User {
 29    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 30        self.github_login.cmp(&other.github_login)
 31    }
 32}
 33
 34impl PartialEq for User {
 35    fn eq(&self, other: &Self) -> bool {
 36        self.id == other.id && self.github_login == other.github_login
 37    }
 38}
 39
 40impl Eq for User {}
 41
 42#[derive(Debug, PartialEq)]
 43pub struct Contact {
 44    pub user: Arc<User>,
 45    pub online: bool,
 46    pub busy: bool,
 47}
 48
 49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 50pub enum ContactRequestStatus {
 51    None,
 52    RequestSent,
 53    RequestReceived,
 54    RequestAccepted,
 55}
 56
 57pub struct UserStore {
 58    users: HashMap<u64, Arc<User>>,
 59    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 60    current_user: watch::Receiver<Option<Arc<User>>>,
 61    contacts: Vec<Arc<Contact>>,
 62    incoming_contact_requests: Vec<Arc<User>>,
 63    outgoing_contact_requests: Vec<Arc<User>>,
 64    pending_contact_requests: HashMap<u64, usize>,
 65    invite_info: Option<InviteInfo>,
 66    client: Weak<Client>,
 67    http: Arc<dyn HttpClient>,
 68    _maintain_contacts: Task<()>,
 69    _maintain_current_user: Task<()>,
 70}
 71
 72#[derive(Clone)]
 73pub struct InviteInfo {
 74    pub count: u32,
 75    pub url: Arc<str>,
 76}
 77
 78pub enum Event {
 79    Contact {
 80        user: Arc<User>,
 81        kind: ContactEventKind,
 82    },
 83    ShowContacts,
 84}
 85
 86#[derive(Clone, Copy)]
 87pub enum ContactEventKind {
 88    Requested,
 89    Accepted,
 90    Cancelled,
 91}
 92
 93impl Entity for UserStore {
 94    type Event = Event;
 95}
 96
 97enum UpdateContacts {
 98    Update(proto::UpdateContacts),
 99    Wait(postage::barrier::Sender),
100    Clear(postage::barrier::Sender),
101}
102
103impl UserStore {
104    pub fn new(
105        client: Arc<Client>,
106        http: Arc<dyn HttpClient>,
107        cx: &mut ModelContext<Self>,
108    ) -> Self {
109        let (mut current_user_tx, current_user_rx) = watch::channel();
110        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
111        let rpc_subscriptions = vec![
112            client.add_message_handler(cx.handle(), Self::handle_update_contacts),
113            client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
114            client.add_message_handler(cx.handle(), Self::handle_show_contacts),
115        ];
116        Self {
117            users: Default::default(),
118            current_user: current_user_rx,
119            contacts: Default::default(),
120            incoming_contact_requests: Default::default(),
121            outgoing_contact_requests: Default::default(),
122            invite_info: None,
123            client: Arc::downgrade(&client),
124            update_contacts_tx,
125            http,
126            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
127                let _subscriptions = rpc_subscriptions;
128                while let Some(message) = update_contacts_rx.next().await {
129                    if let Some(this) = this.upgrade(&cx) {
130                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
131                            .log_err()
132                            .await;
133                    }
134                }
135            }),
136            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
137                let mut status = client.status();
138                while let Some(status) = status.next().await {
139                    match status {
140                        Status::Connected { .. } => {
141                            if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
142                                let fetch_user = this
143                                    .update(&mut cx, |this, cx| this.get_user(user_id, cx))
144                                    .log_err();
145                                let fetch_metrics_id =
146                                    client.request(proto::GetPrivateUserInfo {}).log_err();
147                                let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
148                                cx.read(|cx| {
149                                    client.telemetry.set_authenticated_user_info(
150                                        info.as_ref().map(|info| info.metrics_id.clone()),
151                                        info.as_ref().map(|info| info.staff).unwrap_or(false),
152                                        cx,
153                                    )
154                                });
155
156                                cx.update(|cx| {
157                                    cx.update_default_global(|staff_mode: &mut StaffMode, _| {
158                                        if !staff_mode.0 {
159                                            *staff_mode = StaffMode(
160                                                info.as_ref()
161                                                    .map(|info| info.staff)
162                                                    .unwrap_or_default(),
163                                            )
164                                        }
165                                        ()
166                                    });
167                                });
168
169                                current_user_tx.send(user).await.ok();
170
171                                this.update(&mut cx, |_, cx| {
172                                    cx.notify();
173                                });
174                            }
175                        }
176                        Status::SignedOut => {
177                            current_user_tx.send(None).await.ok();
178                            if let Some(this) = this.upgrade(&cx) {
179                                this.update(&mut cx, |this, cx| {
180                                    cx.notify();
181                                    this.clear_contacts()
182                                })
183                                .await;
184                            }
185                        }
186                        Status::ConnectionLost => {
187                            if let Some(this) = this.upgrade(&cx) {
188                                this.update(&mut cx, |this, cx| {
189                                    cx.notify();
190                                    this.clear_contacts()
191                                })
192                                .await;
193                            }
194                        }
195                        _ => {}
196                    }
197                }
198            }),
199            pending_contact_requests: Default::default(),
200        }
201    }
202
203    #[cfg(feature = "test-support")]
204    pub fn clear_cache(&mut self) {
205        self.users.clear();
206    }
207
208    async fn handle_update_invite_info(
209        this: ModelHandle<Self>,
210        message: TypedEnvelope<proto::UpdateInviteInfo>,
211        _: Arc<Client>,
212        mut cx: AsyncAppContext,
213    ) -> Result<()> {
214        this.update(&mut cx, |this, cx| {
215            this.invite_info = Some(InviteInfo {
216                url: Arc::from(message.payload.url),
217                count: message.payload.count,
218            });
219            cx.notify();
220        });
221        Ok(())
222    }
223
224    async fn handle_show_contacts(
225        this: ModelHandle<Self>,
226        _: TypedEnvelope<proto::ShowContacts>,
227        _: Arc<Client>,
228        mut cx: AsyncAppContext,
229    ) -> Result<()> {
230        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
231        Ok(())
232    }
233
234    pub fn invite_info(&self) -> Option<&InviteInfo> {
235        self.invite_info.as_ref()
236    }
237
238    async fn handle_update_contacts(
239        this: ModelHandle<Self>,
240        message: TypedEnvelope<proto::UpdateContacts>,
241        _: Arc<Client>,
242        mut cx: AsyncAppContext,
243    ) -> Result<()> {
244        this.update(&mut cx, |this, _| {
245            this.update_contacts_tx
246                .unbounded_send(UpdateContacts::Update(message.payload))
247                .unwrap();
248        });
249        Ok(())
250    }
251
252    fn update_contacts(
253        &mut self,
254        message: UpdateContacts,
255        cx: &mut ModelContext<Self>,
256    ) -> Task<Result<()>> {
257        match message {
258            UpdateContacts::Wait(barrier) => {
259                drop(barrier);
260                Task::ready(Ok(()))
261            }
262            UpdateContacts::Clear(barrier) => {
263                self.contacts.clear();
264                self.incoming_contact_requests.clear();
265                self.outgoing_contact_requests.clear();
266                drop(barrier);
267                Task::ready(Ok(()))
268            }
269            UpdateContacts::Update(message) => {
270                let mut user_ids = HashSet::default();
271                for contact in &message.contacts {
272                    user_ids.insert(contact.user_id);
273                }
274                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
275                user_ids.extend(message.outgoing_requests.iter());
276
277                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
278                cx.spawn(|this, mut cx| async move {
279                    load_users.await?;
280
281                    // Users are fetched in parallel above and cached in call to get_users
282                    // No need to paralellize here
283                    let mut updated_contacts = Vec::new();
284                    for contact in message.contacts {
285                        let should_notify = contact.should_notify;
286                        updated_contacts.push((
287                            Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
288                            should_notify,
289                        ));
290                    }
291
292                    let mut incoming_requests = Vec::new();
293                    for request in message.incoming_requests {
294                        incoming_requests.push({
295                            let user = this
296                                .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
297                                .await?;
298                            (user, request.should_notify)
299                        });
300                    }
301
302                    let mut outgoing_requests = Vec::new();
303                    for requested_user_id in message.outgoing_requests {
304                        outgoing_requests.push(
305                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
306                                .await?,
307                        );
308                    }
309
310                    let removed_contacts =
311                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
312                    let removed_incoming_requests =
313                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
314                    let removed_outgoing_requests =
315                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
316
317                    this.update(&mut cx, |this, cx| {
318                        // Remove contacts
319                        this.contacts
320                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
321                        // Update existing contacts and insert new ones
322                        for (updated_contact, should_notify) in updated_contacts {
323                            if should_notify {
324                                cx.emit(Event::Contact {
325                                    user: updated_contact.user.clone(),
326                                    kind: ContactEventKind::Accepted,
327                                });
328                            }
329                            match this.contacts.binary_search_by_key(
330                                &&updated_contact.user.github_login,
331                                |contact| &contact.user.github_login,
332                            ) {
333                                Ok(ix) => this.contacts[ix] = updated_contact,
334                                Err(ix) => this.contacts.insert(ix, updated_contact),
335                            }
336                        }
337
338                        // Remove incoming contact requests
339                        this.incoming_contact_requests.retain(|user| {
340                            if removed_incoming_requests.contains(&user.id) {
341                                cx.emit(Event::Contact {
342                                    user: user.clone(),
343                                    kind: ContactEventKind::Cancelled,
344                                });
345                                false
346                            } else {
347                                true
348                            }
349                        });
350                        // Update existing incoming requests and insert new ones
351                        for (user, should_notify) in incoming_requests {
352                            if should_notify {
353                                cx.emit(Event::Contact {
354                                    user: user.clone(),
355                                    kind: ContactEventKind::Requested,
356                                });
357                            }
358
359                            match this
360                                .incoming_contact_requests
361                                .binary_search_by_key(&&user.github_login, |contact| {
362                                    &contact.github_login
363                                }) {
364                                Ok(ix) => this.incoming_contact_requests[ix] = user,
365                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
366                            }
367                        }
368
369                        // Remove outgoing contact requests
370                        this.outgoing_contact_requests
371                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
372                        // Update existing incoming requests and insert new ones
373                        for request in outgoing_requests {
374                            match this
375                                .outgoing_contact_requests
376                                .binary_search_by_key(&&request.github_login, |contact| {
377                                    &contact.github_login
378                                }) {
379                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
380                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
381                            }
382                        }
383
384                        cx.notify();
385                    });
386
387                    Ok(())
388                })
389            }
390        }
391    }
392
393    pub fn contacts(&self) -> &[Arc<Contact>] {
394        &self.contacts
395    }
396
397    pub fn has_contact(&self, user: &Arc<User>) -> bool {
398        self.contacts
399            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
400            .is_ok()
401    }
402
403    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
404        &self.incoming_contact_requests
405    }
406
407    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
408        &self.outgoing_contact_requests
409    }
410
411    pub fn is_contact_request_pending(&self, user: &User) -> bool {
412        self.pending_contact_requests.contains_key(&user.id)
413    }
414
415    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
416        if self
417            .contacts
418            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
419            .is_ok()
420        {
421            ContactRequestStatus::RequestAccepted
422        } else if self
423            .outgoing_contact_requests
424            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
425            .is_ok()
426        {
427            ContactRequestStatus::RequestSent
428        } else if self
429            .incoming_contact_requests
430            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
431            .is_ok()
432        {
433            ContactRequestStatus::RequestReceived
434        } else {
435            ContactRequestStatus::None
436        }
437    }
438
439    pub fn request_contact(
440        &mut self,
441        responder_id: u64,
442        cx: &mut ModelContext<Self>,
443    ) -> Task<Result<()>> {
444        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
445    }
446
447    pub fn remove_contact(
448        &mut self,
449        user_id: u64,
450        cx: &mut ModelContext<Self>,
451    ) -> Task<Result<()>> {
452        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
453    }
454
455    pub fn respond_to_contact_request(
456        &mut self,
457        requester_id: u64,
458        accept: bool,
459        cx: &mut ModelContext<Self>,
460    ) -> Task<Result<()>> {
461        self.perform_contact_request(
462            requester_id,
463            proto::RespondToContactRequest {
464                requester_id,
465                response: if accept {
466                    proto::ContactRequestResponse::Accept
467                } else {
468                    proto::ContactRequestResponse::Decline
469                } as i32,
470            },
471            cx,
472        )
473    }
474
475    pub fn dismiss_contact_request(
476        &mut self,
477        requester_id: u64,
478        cx: &mut ModelContext<Self>,
479    ) -> Task<Result<()>> {
480        let client = self.client.upgrade();
481        cx.spawn_weak(|_, _| async move {
482            client
483                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
484                .request(proto::RespondToContactRequest {
485                    requester_id,
486                    response: proto::ContactRequestResponse::Dismiss as i32,
487                })
488                .await?;
489            Ok(())
490        })
491    }
492
493    fn perform_contact_request<T: RequestMessage>(
494        &mut self,
495        user_id: u64,
496        request: T,
497        cx: &mut ModelContext<Self>,
498    ) -> Task<Result<()>> {
499        let client = self.client.upgrade();
500        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
501        cx.notify();
502
503        cx.spawn(|this, mut cx| async move {
504            let response = client
505                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
506                .request(request)
507                .await;
508            this.update(&mut cx, |this, cx| {
509                if let Entry::Occupied(mut request_count) =
510                    this.pending_contact_requests.entry(user_id)
511                {
512                    *request_count.get_mut() -= 1;
513                    if *request_count.get() == 0 {
514                        request_count.remove();
515                    }
516                }
517                cx.notify();
518            });
519            response?;
520            Ok(())
521        })
522    }
523
524    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
525        let (tx, mut rx) = postage::barrier::channel();
526        self.update_contacts_tx
527            .unbounded_send(UpdateContacts::Clear(tx))
528            .unwrap();
529        async move {
530            rx.next().await;
531        }
532    }
533
534    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
535        let (tx, mut rx) = postage::barrier::channel();
536        self.update_contacts_tx
537            .unbounded_send(UpdateContacts::Wait(tx))
538            .unwrap();
539        async move {
540            rx.next().await;
541        }
542    }
543
544    pub fn get_users(
545        &mut self,
546        user_ids: Vec<u64>,
547        cx: &mut ModelContext<Self>,
548    ) -> Task<Result<Vec<Arc<User>>>> {
549        let mut user_ids_to_fetch = user_ids.clone();
550        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
551
552        cx.spawn(|this, mut cx| async move {
553            if !user_ids_to_fetch.is_empty() {
554                this.update(&mut cx, |this, cx| {
555                    this.load_users(
556                        proto::GetUsers {
557                            user_ids: user_ids_to_fetch,
558                        },
559                        cx,
560                    )
561                })
562                .await?;
563            }
564
565            this.read_with(&cx, |this, _| {
566                user_ids
567                    .iter()
568                    .map(|user_id| {
569                        this.users
570                            .get(user_id)
571                            .cloned()
572                            .ok_or_else(|| anyhow!("user {} not found", user_id))
573                    })
574                    .collect()
575            })
576        })
577    }
578
579    pub fn fuzzy_search_users(
580        &mut self,
581        query: String,
582        cx: &mut ModelContext<Self>,
583    ) -> Task<Result<Vec<Arc<User>>>> {
584        self.load_users(proto::FuzzySearchUsers { query }, cx)
585    }
586
587    pub fn get_user(
588        &mut self,
589        user_id: u64,
590        cx: &mut ModelContext<Self>,
591    ) -> Task<Result<Arc<User>>> {
592        if let Some(user) = self.users.get(&user_id).cloned() {
593            return cx.foreground().spawn(async move { Ok(user) });
594        }
595
596        let load_users = self.get_users(vec![user_id], cx);
597        cx.spawn(|this, mut cx| async move {
598            load_users.await?;
599            this.update(&mut cx, |this, _| {
600                this.users
601                    .get(&user_id)
602                    .cloned()
603                    .ok_or_else(|| anyhow!("server responded with no users"))
604            })
605        })
606    }
607
608    pub fn current_user(&self) -> Option<Arc<User>> {
609        self.current_user.borrow().clone()
610    }
611
612    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
613        self.current_user.clone()
614    }
615
616    fn load_users(
617        &mut self,
618        request: impl RequestMessage<Response = UsersResponse>,
619        cx: &mut ModelContext<Self>,
620    ) -> Task<Result<Vec<Arc<User>>>> {
621        let client = self.client.clone();
622        let http = self.http.clone();
623        cx.spawn_weak(|this, mut cx| async move {
624            if let Some(rpc) = client.upgrade() {
625                let response = rpc.request(request).await.context("error loading users")?;
626                let users = future::join_all(
627                    response
628                        .users
629                        .into_iter()
630                        .map(|user| User::new(user, http.as_ref())),
631                )
632                .await;
633
634                if let Some(this) = this.upgrade(&cx) {
635                    this.update(&mut cx, |this, _| {
636                        for user in &users {
637                            this.users.insert(user.id, user.clone());
638                        }
639                    });
640                }
641                Ok(users)
642            } else {
643                Ok(Vec::new())
644            }
645        })
646    }
647}
648
649impl User {
650    async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
651        Arc::new(User {
652            id: message.id,
653            github_login: message.github_login,
654            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
655        })
656    }
657}
658
659impl Contact {
660    async fn from_proto(
661        contact: proto::Contact,
662        user_store: &ModelHandle<UserStore>,
663        cx: &mut AsyncAppContext,
664    ) -> Result<Self> {
665        let user = user_store
666            .update(cx, |user_store, cx| {
667                user_store.get_user(contact.user_id, cx)
668            })
669            .await?;
670        Ok(Self {
671            user,
672            online: contact.online,
673            busy: contact.busy,
674        })
675    }
676}
677
678async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
679    let mut response = http
680        .get(url, Default::default(), true)
681        .await
682        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
683
684    if !response.status().is_success() {
685        return Err(anyhow!("avatar request failed {:?}", response.status()));
686    }
687
688    let mut body = Vec::new();
689    response
690        .body_mut()
691        .read_to_end(&mut body)
692        .await
693        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
694    let format = image::guess_format(&body)?;
695    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
696    Ok(ImageData::new(image))
697}