user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use collections::{hash_map::Entry, HashMap, HashSet};
  4use feature_flags::FeatureFlagAppExt;
  5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
  6use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  7use postage::{sink::Sink, watch};
  8use rpc::proto::{RequestMessage, UsersResponse};
  9use std::sync::{Arc, Weak};
 10use util::http::HttpClient;
 11use util::TryFutureExt as _;
 12
 13pub type UserId = u64;
 14
 15#[derive(Default, Debug)]
 16pub struct User {
 17    pub id: UserId,
 18    pub github_login: String,
 19    pub avatar: Option<Arc<ImageData>>,
 20}
 21
 22impl PartialOrd for User {
 23    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 24        Some(self.cmp(other))
 25    }
 26}
 27
 28impl Ord for User {
 29    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 30        self.github_login.cmp(&other.github_login)
 31    }
 32}
 33
 34impl PartialEq for User {
 35    fn eq(&self, other: &Self) -> bool {
 36        self.id == other.id && self.github_login == other.github_login
 37    }
 38}
 39
 40impl Eq for User {}
 41
 42#[derive(Debug, PartialEq)]
 43pub struct Contact {
 44    pub user: Arc<User>,
 45    pub online: bool,
 46    pub busy: bool,
 47}
 48
 49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 50pub enum ContactRequestStatus {
 51    None,
 52    RequestSent,
 53    RequestReceived,
 54    RequestAccepted,
 55}
 56
 57pub struct UserStore {
 58    users: HashMap<u64, Arc<User>>,
 59    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 60    current_user: watch::Receiver<Option<Arc<User>>>,
 61    contacts: Vec<Arc<Contact>>,
 62    incoming_contact_requests: Vec<Arc<User>>,
 63    outgoing_contact_requests: Vec<Arc<User>>,
 64    pending_contact_requests: HashMap<u64, usize>,
 65    invite_info: Option<InviteInfo>,
 66    client: Weak<Client>,
 67    http: Arc<dyn HttpClient>,
 68    _maintain_contacts: Task<()>,
 69    _maintain_current_user: Task<()>,
 70}
 71
 72#[derive(Clone)]
 73pub struct InviteInfo {
 74    pub count: u32,
 75    pub url: Arc<str>,
 76}
 77
 78pub enum Event {
 79    Contact {
 80        user: Arc<User>,
 81        kind: ContactEventKind,
 82    },
 83    ShowContacts,
 84}
 85
 86#[derive(Clone, Copy)]
 87pub enum ContactEventKind {
 88    Requested,
 89    Accepted,
 90    Cancelled,
 91}
 92
 93impl Entity for UserStore {
 94    type Event = Event;
 95}
 96
 97enum UpdateContacts {
 98    Update(proto::UpdateContacts),
 99    Wait(postage::barrier::Sender),
100    Clear(postage::barrier::Sender),
101}
102
103impl UserStore {
104    pub fn new(
105        client: Arc<Client>,
106        http: Arc<dyn HttpClient>,
107        cx: &mut ModelContext<Self>,
108    ) -> Self {
109        let (mut current_user_tx, current_user_rx) = watch::channel();
110        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
111        let rpc_subscriptions = vec![
112            client.add_message_handler(cx.handle(), Self::handle_update_contacts),
113            client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
114            client.add_message_handler(cx.handle(), Self::handle_show_contacts),
115        ];
116        Self {
117            users: Default::default(),
118            current_user: current_user_rx,
119            contacts: Default::default(),
120            incoming_contact_requests: Default::default(),
121            outgoing_contact_requests: Default::default(),
122            invite_info: None,
123            client: Arc::downgrade(&client),
124            update_contacts_tx,
125            http,
126            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
127                let _subscriptions = rpc_subscriptions;
128                while let Some(message) = update_contacts_rx.next().await {
129                    if let Some(this) = this.upgrade(&cx) {
130                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
131                            .log_err()
132                            .await;
133                    }
134                }
135            }),
136            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
137                let mut status = client.status();
138                while let Some(status) = status.next().await {
139                    match status {
140                        Status::Connected { .. } => {
141                            if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
142                                let fetch_user = this
143                                    .update(&mut cx, |this, cx| this.get_user(user_id, cx))
144                                    .log_err();
145                                let fetch_metrics_id =
146                                    client.request(proto::GetPrivateUserInfo {}).log_err();
147                                let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
148
149                                if let Some(info) = info {
150                                    cx.update(|cx| {
151                                        cx.update_flags(info.staff, info.flags);
152                                        client.telemetry.set_authenticated_user_info(
153                                            Some(info.metrics_id.clone()),
154                                            info.staff,
155                                            cx,
156                                        )
157                                    });
158                                } else {
159                                    cx.read(|cx| {
160                                        client
161                                            .telemetry
162                                            .set_authenticated_user_info(None, false, cx)
163                                    });
164                                }
165
166                                current_user_tx.send(user).await.ok();
167
168                                this.update(&mut cx, |_, cx| {
169                                    cx.notify();
170                                });
171                            }
172                        }
173                        Status::SignedOut => {
174                            current_user_tx.send(None).await.ok();
175                            if let Some(this) = this.upgrade(&cx) {
176                                this.update(&mut cx, |this, cx| {
177                                    cx.notify();
178                                    this.clear_contacts()
179                                })
180                                .await;
181                            }
182                        }
183                        Status::ConnectionLost => {
184                            if let Some(this) = this.upgrade(&cx) {
185                                this.update(&mut cx, |this, cx| {
186                                    cx.notify();
187                                    this.clear_contacts()
188                                })
189                                .await;
190                            }
191                        }
192                        _ => {}
193                    }
194                }
195            }),
196            pending_contact_requests: Default::default(),
197        }
198    }
199
200    #[cfg(feature = "test-support")]
201    pub fn clear_cache(&mut self) {
202        self.users.clear();
203    }
204
205    async fn handle_update_invite_info(
206        this: ModelHandle<Self>,
207        message: TypedEnvelope<proto::UpdateInviteInfo>,
208        _: Arc<Client>,
209        mut cx: AsyncAppContext,
210    ) -> Result<()> {
211        this.update(&mut cx, |this, cx| {
212            this.invite_info = Some(InviteInfo {
213                url: Arc::from(message.payload.url),
214                count: message.payload.count,
215            });
216            cx.notify();
217        });
218        Ok(())
219    }
220
221    async fn handle_show_contacts(
222        this: ModelHandle<Self>,
223        _: TypedEnvelope<proto::ShowContacts>,
224        _: Arc<Client>,
225        mut cx: AsyncAppContext,
226    ) -> Result<()> {
227        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
228        Ok(())
229    }
230
231    pub fn invite_info(&self) -> Option<&InviteInfo> {
232        self.invite_info.as_ref()
233    }
234
235    async fn handle_update_contacts(
236        this: ModelHandle<Self>,
237        message: TypedEnvelope<proto::UpdateContacts>,
238        _: Arc<Client>,
239        mut cx: AsyncAppContext,
240    ) -> Result<()> {
241        this.update(&mut cx, |this, _| {
242            this.update_contacts_tx
243                .unbounded_send(UpdateContacts::Update(message.payload))
244                .unwrap();
245        });
246        Ok(())
247    }
248
249    fn update_contacts(
250        &mut self,
251        message: UpdateContacts,
252        cx: &mut ModelContext<Self>,
253    ) -> Task<Result<()>> {
254        match message {
255            UpdateContacts::Wait(barrier) => {
256                drop(barrier);
257                Task::ready(Ok(()))
258            }
259            UpdateContacts::Clear(barrier) => {
260                self.contacts.clear();
261                self.incoming_contact_requests.clear();
262                self.outgoing_contact_requests.clear();
263                drop(barrier);
264                Task::ready(Ok(()))
265            }
266            UpdateContacts::Update(message) => {
267                let mut user_ids = HashSet::default();
268                for contact in &message.contacts {
269                    user_ids.insert(contact.user_id);
270                }
271                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
272                user_ids.extend(message.outgoing_requests.iter());
273
274                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
275                cx.spawn(|this, mut cx| async move {
276                    load_users.await?;
277
278                    // Users are fetched in parallel above and cached in call to get_users
279                    // No need to paralellize here
280                    let mut updated_contacts = Vec::new();
281                    for contact in message.contacts {
282                        let should_notify = contact.should_notify;
283                        updated_contacts.push((
284                            Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
285                            should_notify,
286                        ));
287                    }
288
289                    let mut incoming_requests = Vec::new();
290                    for request in message.incoming_requests {
291                        incoming_requests.push({
292                            let user = this
293                                .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
294                                .await?;
295                            (user, request.should_notify)
296                        });
297                    }
298
299                    let mut outgoing_requests = Vec::new();
300                    for requested_user_id in message.outgoing_requests {
301                        outgoing_requests.push(
302                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
303                                .await?,
304                        );
305                    }
306
307                    let removed_contacts =
308                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
309                    let removed_incoming_requests =
310                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
311                    let removed_outgoing_requests =
312                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
313
314                    this.update(&mut cx, |this, cx| {
315                        // Remove contacts
316                        this.contacts
317                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
318                        // Update existing contacts and insert new ones
319                        for (updated_contact, should_notify) in updated_contacts {
320                            if should_notify {
321                                cx.emit(Event::Contact {
322                                    user: updated_contact.user.clone(),
323                                    kind: ContactEventKind::Accepted,
324                                });
325                            }
326                            match this.contacts.binary_search_by_key(
327                                &&updated_contact.user.github_login,
328                                |contact| &contact.user.github_login,
329                            ) {
330                                Ok(ix) => this.contacts[ix] = updated_contact,
331                                Err(ix) => this.contacts.insert(ix, updated_contact),
332                            }
333                        }
334
335                        // Remove incoming contact requests
336                        this.incoming_contact_requests.retain(|user| {
337                            if removed_incoming_requests.contains(&user.id) {
338                                cx.emit(Event::Contact {
339                                    user: user.clone(),
340                                    kind: ContactEventKind::Cancelled,
341                                });
342                                false
343                            } else {
344                                true
345                            }
346                        });
347                        // Update existing incoming requests and insert new ones
348                        for (user, should_notify) in incoming_requests {
349                            if should_notify {
350                                cx.emit(Event::Contact {
351                                    user: user.clone(),
352                                    kind: ContactEventKind::Requested,
353                                });
354                            }
355
356                            match this
357                                .incoming_contact_requests
358                                .binary_search_by_key(&&user.github_login, |contact| {
359                                    &contact.github_login
360                                }) {
361                                Ok(ix) => this.incoming_contact_requests[ix] = user,
362                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
363                            }
364                        }
365
366                        // Remove outgoing contact requests
367                        this.outgoing_contact_requests
368                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
369                        // Update existing incoming requests and insert new ones
370                        for request in outgoing_requests {
371                            match this
372                                .outgoing_contact_requests
373                                .binary_search_by_key(&&request.github_login, |contact| {
374                                    &contact.github_login
375                                }) {
376                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
377                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
378                            }
379                        }
380
381                        cx.notify();
382                    });
383
384                    Ok(())
385                })
386            }
387        }
388    }
389
390    pub fn contacts(&self) -> &[Arc<Contact>] {
391        &self.contacts
392    }
393
394    pub fn has_contact(&self, user: &Arc<User>) -> bool {
395        self.contacts
396            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
397            .is_ok()
398    }
399
400    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
401        &self.incoming_contact_requests
402    }
403
404    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
405        &self.outgoing_contact_requests
406    }
407
408    pub fn is_contact_request_pending(&self, user: &User) -> bool {
409        self.pending_contact_requests.contains_key(&user.id)
410    }
411
412    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
413        if self
414            .contacts
415            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
416            .is_ok()
417        {
418            ContactRequestStatus::RequestAccepted
419        } else if self
420            .outgoing_contact_requests
421            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
422            .is_ok()
423        {
424            ContactRequestStatus::RequestSent
425        } else if self
426            .incoming_contact_requests
427            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
428            .is_ok()
429        {
430            ContactRequestStatus::RequestReceived
431        } else {
432            ContactRequestStatus::None
433        }
434    }
435
436    pub fn request_contact(
437        &mut self,
438        responder_id: u64,
439        cx: &mut ModelContext<Self>,
440    ) -> Task<Result<()>> {
441        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
442    }
443
444    pub fn remove_contact(
445        &mut self,
446        user_id: u64,
447        cx: &mut ModelContext<Self>,
448    ) -> Task<Result<()>> {
449        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
450    }
451
452    pub fn respond_to_contact_request(
453        &mut self,
454        requester_id: u64,
455        accept: bool,
456        cx: &mut ModelContext<Self>,
457    ) -> Task<Result<()>> {
458        self.perform_contact_request(
459            requester_id,
460            proto::RespondToContactRequest {
461                requester_id,
462                response: if accept {
463                    proto::ContactRequestResponse::Accept
464                } else {
465                    proto::ContactRequestResponse::Decline
466                } as i32,
467            },
468            cx,
469        )
470    }
471
472    pub fn dismiss_contact_request(
473        &mut self,
474        requester_id: u64,
475        cx: &mut ModelContext<Self>,
476    ) -> Task<Result<()>> {
477        let client = self.client.upgrade();
478        cx.spawn_weak(|_, _| async move {
479            client
480                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
481                .request(proto::RespondToContactRequest {
482                    requester_id,
483                    response: proto::ContactRequestResponse::Dismiss as i32,
484                })
485                .await?;
486            Ok(())
487        })
488    }
489
490    fn perform_contact_request<T: RequestMessage>(
491        &mut self,
492        user_id: u64,
493        request: T,
494        cx: &mut ModelContext<Self>,
495    ) -> Task<Result<()>> {
496        let client = self.client.upgrade();
497        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
498        cx.notify();
499
500        cx.spawn(|this, mut cx| async move {
501            let response = client
502                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
503                .request(request)
504                .await;
505            this.update(&mut cx, |this, cx| {
506                if let Entry::Occupied(mut request_count) =
507                    this.pending_contact_requests.entry(user_id)
508                {
509                    *request_count.get_mut() -= 1;
510                    if *request_count.get() == 0 {
511                        request_count.remove();
512                    }
513                }
514                cx.notify();
515            });
516            response?;
517            Ok(())
518        })
519    }
520
521    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
522        let (tx, mut rx) = postage::barrier::channel();
523        self.update_contacts_tx
524            .unbounded_send(UpdateContacts::Clear(tx))
525            .unwrap();
526        async move {
527            rx.next().await;
528        }
529    }
530
531    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
532        let (tx, mut rx) = postage::barrier::channel();
533        self.update_contacts_tx
534            .unbounded_send(UpdateContacts::Wait(tx))
535            .unwrap();
536        async move {
537            rx.next().await;
538        }
539    }
540
541    pub fn get_users(
542        &mut self,
543        user_ids: Vec<u64>,
544        cx: &mut ModelContext<Self>,
545    ) -> Task<Result<Vec<Arc<User>>>> {
546        let mut user_ids_to_fetch = user_ids.clone();
547        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
548
549        cx.spawn(|this, mut cx| async move {
550            if !user_ids_to_fetch.is_empty() {
551                this.update(&mut cx, |this, cx| {
552                    this.load_users(
553                        proto::GetUsers {
554                            user_ids: user_ids_to_fetch,
555                        },
556                        cx,
557                    )
558                })
559                .await?;
560            }
561
562            this.read_with(&cx, |this, _| {
563                user_ids
564                    .iter()
565                    .map(|user_id| {
566                        this.users
567                            .get(user_id)
568                            .cloned()
569                            .ok_or_else(|| anyhow!("user {} not found", user_id))
570                    })
571                    .collect()
572            })
573        })
574    }
575
576    pub fn fuzzy_search_users(
577        &mut self,
578        query: String,
579        cx: &mut ModelContext<Self>,
580    ) -> Task<Result<Vec<Arc<User>>>> {
581        self.load_users(proto::FuzzySearchUsers { query }, cx)
582    }
583
584    pub fn get_user(
585        &mut self,
586        user_id: u64,
587        cx: &mut ModelContext<Self>,
588    ) -> Task<Result<Arc<User>>> {
589        if let Some(user) = self.users.get(&user_id).cloned() {
590            return cx.foreground().spawn(async move { Ok(user) });
591        }
592
593        let load_users = self.get_users(vec![user_id], cx);
594        cx.spawn(|this, mut cx| async move {
595            load_users.await?;
596            this.update(&mut cx, |this, _| {
597                this.users
598                    .get(&user_id)
599                    .cloned()
600                    .ok_or_else(|| anyhow!("server responded with no users"))
601            })
602        })
603    }
604
605    pub fn current_user(&self) -> Option<Arc<User>> {
606        self.current_user.borrow().clone()
607    }
608
609    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
610        self.current_user.clone()
611    }
612
613    fn load_users(
614        &mut self,
615        request: impl RequestMessage<Response = UsersResponse>,
616        cx: &mut ModelContext<Self>,
617    ) -> Task<Result<Vec<Arc<User>>>> {
618        let client = self.client.clone();
619        let http = self.http.clone();
620        cx.spawn_weak(|this, mut cx| async move {
621            if let Some(rpc) = client.upgrade() {
622                let response = rpc.request(request).await.context("error loading users")?;
623                let users = future::join_all(
624                    response
625                        .users
626                        .into_iter()
627                        .map(|user| User::new(user, http.as_ref())),
628                )
629                .await;
630
631                if let Some(this) = this.upgrade(&cx) {
632                    this.update(&mut cx, |this, _| {
633                        for user in &users {
634                            this.users.insert(user.id, user.clone());
635                        }
636                    });
637                }
638                Ok(users)
639            } else {
640                Ok(Vec::new())
641            }
642        })
643    }
644}
645
646impl User {
647    async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
648        Arc::new(User {
649            id: message.id,
650            github_login: message.github_login,
651            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
652        })
653    }
654}
655
656impl Contact {
657    async fn from_proto(
658        contact: proto::Contact,
659        user_store: &ModelHandle<UserStore>,
660        cx: &mut AsyncAppContext,
661    ) -> Result<Self> {
662        let user = user_store
663            .update(cx, |user_store, cx| {
664                user_store.get_user(contact.user_id, cx)
665            })
666            .await?;
667        Ok(Self {
668            user,
669            online: contact.online,
670            busy: contact.busy,
671        })
672    }
673}
674
675async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
676    let mut response = http
677        .get(url, Default::default(), true)
678        .await
679        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
680
681    if !response.status().is_success() {
682        return Err(anyhow!("avatar request failed {:?}", response.status()));
683    }
684
685    let mut body = Vec::new();
686    response
687        .body_mut()
688        .read_to_end(&mut body)
689        .await
690        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
691    let format = image::guess_format(&body)?;
692    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
693    Ok(ImageData::new(image))
694}