user.rs

  1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
  4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
  5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  6use postage::{prelude::Stream, sink::Sink, watch};
  7use rpc::proto::{RequestMessage, UsersResponse};
  8use std::sync::{Arc, Weak};
  9use util::TryFutureExt as _;
 10
 11#[derive(Debug)]
 12pub struct User {
 13    pub id: u64,
 14    pub github_login: String,
 15    pub avatar: Option<Arc<ImageData>>,
 16}
 17
 18impl PartialOrd for User {
 19    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 20        Some(self.cmp(other))
 21    }
 22}
 23
 24impl Ord for User {
 25    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 26        self.github_login.cmp(&other.github_login)
 27    }
 28}
 29
 30impl PartialEq for User {
 31    fn eq(&self, other: &Self) -> bool {
 32        self.id == other.id && self.github_login == other.github_login
 33    }
 34}
 35
 36impl Eq for User {}
 37
 38#[derive(Debug, PartialEq)]
 39pub struct Contact {
 40    pub user: Arc<User>,
 41    pub online: bool,
 42    pub projects: Vec<ProjectMetadata>,
 43}
 44
 45#[derive(Clone, Debug, PartialEq)]
 46pub struct ProjectMetadata {
 47    pub id: u64,
 48    pub visible_worktree_root_names: Vec<String>,
 49    pub guests: BTreeSet<Arc<User>>,
 50}
 51
 52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 53pub enum ContactRequestStatus {
 54    None,
 55    RequestSent,
 56    RequestReceived,
 57    RequestAccepted,
 58}
 59
 60pub struct UserStore {
 61    users: HashMap<u64, Arc<User>>,
 62    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 63    current_user: watch::Receiver<Option<Arc<User>>>,
 64    contacts: Vec<Arc<Contact>>,
 65    incoming_contact_requests: Vec<Arc<User>>,
 66    outgoing_contact_requests: Vec<Arc<User>>,
 67    pending_contact_requests: HashMap<u64, usize>,
 68    invite_info: Option<InviteInfo>,
 69    client: Weak<Client>,
 70    http: Arc<dyn HttpClient>,
 71    _maintain_contacts: Task<()>,
 72    _maintain_current_user: Task<()>,
 73}
 74
 75#[derive(Clone)]
 76pub struct InviteInfo {
 77    pub count: u32,
 78    pub url: Arc<str>,
 79}
 80
 81pub enum Event {
 82    Contact {
 83        user: Arc<User>,
 84        kind: ContactEventKind,
 85    },
 86    ShowContacts,
 87}
 88
 89#[derive(Clone, Copy)]
 90pub enum ContactEventKind {
 91    Requested,
 92    Accepted,
 93    Cancelled,
 94}
 95
 96impl Entity for UserStore {
 97    type Event = Event;
 98}
 99
100enum UpdateContacts {
101    Update(proto::UpdateContacts),
102    Wait(postage::barrier::Sender),
103    Clear(postage::barrier::Sender),
104}
105
106impl UserStore {
107    pub fn new(
108        client: Arc<Client>,
109        http: Arc<dyn HttpClient>,
110        cx: &mut ModelContext<Self>,
111    ) -> Self {
112        let (mut current_user_tx, current_user_rx) = watch::channel();
113        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
114        let rpc_subscriptions = vec![
115            client.add_message_handler(cx.handle(), Self::handle_update_contacts),
116            client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
117            client.add_message_handler(cx.handle(), Self::handle_show_contacts),
118        ];
119        Self {
120            users: Default::default(),
121            current_user: current_user_rx,
122            contacts: Default::default(),
123            incoming_contact_requests: Default::default(),
124            outgoing_contact_requests: Default::default(),
125            invite_info: None,
126            client: Arc::downgrade(&client),
127            update_contacts_tx,
128            http,
129            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
130                let _subscriptions = rpc_subscriptions;
131                while let Some(message) = update_contacts_rx.next().await {
132                    if let Some(this) = this.upgrade(&cx) {
133                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
134                            .log_err()
135                            .await;
136                    }
137                }
138            }),
139            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
140                let mut status = client.status();
141                while let Some(status) = status.recv().await {
142                    match status {
143                        Status::Connected { .. } => {
144                            if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
145                                let fetch_user = this
146                                    .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
147                                    .log_err();
148                                let fetch_metrics_id =
149                                    client.request(proto::GetPrivateUserInfo {}).log_err();
150                                let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
151                                client.telemetry.set_metrics_id(info.map(|i| i.metrics_id));
152                                client.telemetry.report_event("sign in", Default::default());
153                                current_user_tx.send(user).await.ok();
154                            }
155                        }
156                        Status::SignedOut => {
157                            current_user_tx.send(None).await.ok();
158                            if let Some(this) = this.upgrade(&cx) {
159                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
160                            }
161                        }
162                        Status::ConnectionLost => {
163                            if let Some(this) = this.upgrade(&cx) {
164                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
165                            }
166                        }
167                        _ => {}
168                    }
169                }
170            }),
171            pending_contact_requests: Default::default(),
172        }
173    }
174
175    async fn handle_update_invite_info(
176        this: ModelHandle<Self>,
177        message: TypedEnvelope<proto::UpdateInviteInfo>,
178        _: Arc<Client>,
179        mut cx: AsyncAppContext,
180    ) -> Result<()> {
181        this.update(&mut cx, |this, cx| {
182            this.invite_info = Some(InviteInfo {
183                url: Arc::from(message.payload.url),
184                count: message.payload.count,
185            });
186            cx.notify();
187        });
188        Ok(())
189    }
190
191    async fn handle_show_contacts(
192        this: ModelHandle<Self>,
193        _: TypedEnvelope<proto::ShowContacts>,
194        _: Arc<Client>,
195        mut cx: AsyncAppContext,
196    ) -> Result<()> {
197        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
198        Ok(())
199    }
200
201    pub fn invite_info(&self) -> Option<&InviteInfo> {
202        self.invite_info.as_ref()
203    }
204
205    async fn handle_update_contacts(
206        this: ModelHandle<Self>,
207        message: TypedEnvelope<proto::UpdateContacts>,
208        _: Arc<Client>,
209        mut cx: AsyncAppContext,
210    ) -> Result<()> {
211        this.update(&mut cx, |this, _| {
212            this.update_contacts_tx
213                .unbounded_send(UpdateContacts::Update(message.payload))
214                .unwrap();
215        });
216        Ok(())
217    }
218
219    fn update_contacts(
220        &mut self,
221        message: UpdateContacts,
222        cx: &mut ModelContext<Self>,
223    ) -> Task<Result<()>> {
224        match message {
225            UpdateContacts::Wait(barrier) => {
226                drop(barrier);
227                Task::ready(Ok(()))
228            }
229            UpdateContacts::Clear(barrier) => {
230                self.contacts.clear();
231                self.incoming_contact_requests.clear();
232                self.outgoing_contact_requests.clear();
233                drop(barrier);
234                Task::ready(Ok(()))
235            }
236            UpdateContacts::Update(message) => {
237                let mut user_ids = HashSet::default();
238                for contact in &message.contacts {
239                    user_ids.insert(contact.user_id);
240                    user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
241                }
242                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
243                user_ids.extend(message.outgoing_requests.iter());
244
245                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
246                cx.spawn(|this, mut cx| async move {
247                    load_users.await?;
248
249                    // Users are fetched in parallel above and cached in call to get_users
250                    // No need to paralellize here
251                    let mut updated_contacts = Vec::new();
252                    for contact in message.contacts {
253                        let should_notify = contact.should_notify;
254                        updated_contacts.push((
255                            Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
256                            should_notify,
257                        ));
258                    }
259
260                    let mut incoming_requests = Vec::new();
261                    for request in message.incoming_requests {
262                        incoming_requests.push({
263                            let user = this
264                                .update(&mut cx, |this, cx| {
265                                    this.fetch_user(request.requester_id, cx)
266                                })
267                                .await?;
268                            (user, request.should_notify)
269                        });
270                    }
271
272                    let mut outgoing_requests = Vec::new();
273                    for requested_user_id in message.outgoing_requests {
274                        outgoing_requests.push(
275                            this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
276                                .await?,
277                        );
278                    }
279
280                    let removed_contacts =
281                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
282                    let removed_incoming_requests =
283                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
284                    let removed_outgoing_requests =
285                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
286
287                    this.update(&mut cx, |this, cx| {
288                        // Remove contacts
289                        this.contacts
290                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
291                        // Update existing contacts and insert new ones
292                        for (updated_contact, should_notify) in updated_contacts {
293                            if should_notify {
294                                cx.emit(Event::Contact {
295                                    user: updated_contact.user.clone(),
296                                    kind: ContactEventKind::Accepted,
297                                });
298                            }
299                            match this.contacts.binary_search_by_key(
300                                &&updated_contact.user.github_login,
301                                |contact| &contact.user.github_login,
302                            ) {
303                                Ok(ix) => this.contacts[ix] = updated_contact,
304                                Err(ix) => this.contacts.insert(ix, updated_contact),
305                            }
306                        }
307
308                        // Remove incoming contact requests
309                        this.incoming_contact_requests.retain(|user| {
310                            if removed_incoming_requests.contains(&user.id) {
311                                cx.emit(Event::Contact {
312                                    user: user.clone(),
313                                    kind: ContactEventKind::Cancelled,
314                                });
315                                false
316                            } else {
317                                true
318                            }
319                        });
320                        // Update existing incoming requests and insert new ones
321                        for (user, should_notify) in incoming_requests {
322                            if should_notify {
323                                cx.emit(Event::Contact {
324                                    user: user.clone(),
325                                    kind: ContactEventKind::Requested,
326                                });
327                            }
328
329                            match this
330                                .incoming_contact_requests
331                                .binary_search_by_key(&&user.github_login, |contact| {
332                                    &contact.github_login
333                                }) {
334                                Ok(ix) => this.incoming_contact_requests[ix] = user,
335                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
336                            }
337                        }
338
339                        // Remove outgoing contact requests
340                        this.outgoing_contact_requests
341                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
342                        // Update existing incoming requests and insert new ones
343                        for request in outgoing_requests {
344                            match this
345                                .outgoing_contact_requests
346                                .binary_search_by_key(&&request.github_login, |contact| {
347                                    &contact.github_login
348                                }) {
349                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
350                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
351                            }
352                        }
353
354                        cx.notify();
355                    });
356
357                    Ok(())
358                })
359            }
360        }
361    }
362
363    pub fn contacts(&self) -> &[Arc<Contact>] {
364        &self.contacts
365    }
366
367    pub fn has_contact(&self, user: &Arc<User>) -> bool {
368        self.contacts
369            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
370            .is_ok()
371    }
372
373    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
374        &self.incoming_contact_requests
375    }
376
377    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
378        &self.outgoing_contact_requests
379    }
380
381    pub fn is_contact_request_pending(&self, user: &User) -> bool {
382        self.pending_contact_requests.contains_key(&user.id)
383    }
384
385    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
386        if self
387            .contacts
388            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
389            .is_ok()
390        {
391            ContactRequestStatus::RequestAccepted
392        } else if self
393            .outgoing_contact_requests
394            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
395            .is_ok()
396        {
397            ContactRequestStatus::RequestSent
398        } else if self
399            .incoming_contact_requests
400            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
401            .is_ok()
402        {
403            ContactRequestStatus::RequestReceived
404        } else {
405            ContactRequestStatus::None
406        }
407    }
408
409    pub fn request_contact(
410        &mut self,
411        responder_id: u64,
412        cx: &mut ModelContext<Self>,
413    ) -> Task<Result<()>> {
414        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
415    }
416
417    pub fn remove_contact(
418        &mut self,
419        user_id: u64,
420        cx: &mut ModelContext<Self>,
421    ) -> Task<Result<()>> {
422        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
423    }
424
425    pub fn respond_to_contact_request(
426        &mut self,
427        requester_id: u64,
428        accept: bool,
429        cx: &mut ModelContext<Self>,
430    ) -> Task<Result<()>> {
431        self.perform_contact_request(
432            requester_id,
433            proto::RespondToContactRequest {
434                requester_id,
435                response: if accept {
436                    proto::ContactRequestResponse::Accept
437                } else {
438                    proto::ContactRequestResponse::Decline
439                } as i32,
440            },
441            cx,
442        )
443    }
444
445    pub fn dismiss_contact_request(
446        &mut self,
447        requester_id: u64,
448        cx: &mut ModelContext<Self>,
449    ) -> Task<Result<()>> {
450        let client = self.client.upgrade();
451        cx.spawn_weak(|_, _| async move {
452            client
453                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
454                .request(proto::RespondToContactRequest {
455                    requester_id,
456                    response: proto::ContactRequestResponse::Dismiss as i32,
457                })
458                .await?;
459            Ok(())
460        })
461    }
462
463    fn perform_contact_request<T: RequestMessage>(
464        &mut self,
465        user_id: u64,
466        request: T,
467        cx: &mut ModelContext<Self>,
468    ) -> Task<Result<()>> {
469        let client = self.client.upgrade();
470        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
471        cx.notify();
472
473        cx.spawn(|this, mut cx| async move {
474            let response = client
475                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
476                .request(request)
477                .await;
478            this.update(&mut cx, |this, cx| {
479                if let Entry::Occupied(mut request_count) =
480                    this.pending_contact_requests.entry(user_id)
481                {
482                    *request_count.get_mut() -= 1;
483                    if *request_count.get() == 0 {
484                        request_count.remove();
485                    }
486                }
487                cx.notify();
488            });
489            response?;
490            Ok(())
491        })
492    }
493
494    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
495        let (tx, mut rx) = postage::barrier::channel();
496        self.update_contacts_tx
497            .unbounded_send(UpdateContacts::Clear(tx))
498            .unwrap();
499        async move {
500            rx.recv().await;
501        }
502    }
503
504    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
505        let (tx, mut rx) = postage::barrier::channel();
506        self.update_contacts_tx
507            .unbounded_send(UpdateContacts::Wait(tx))
508            .unwrap();
509        async move {
510            rx.recv().await;
511        }
512    }
513
514    pub fn get_users(
515        &mut self,
516        mut user_ids: Vec<u64>,
517        cx: &mut ModelContext<Self>,
518    ) -> Task<Result<()>> {
519        user_ids.retain(|id| !self.users.contains_key(id));
520        if user_ids.is_empty() {
521            Task::ready(Ok(()))
522        } else {
523            let load = self.load_users(proto::GetUsers { user_ids }, cx);
524            cx.foreground().spawn(async move {
525                load.await?;
526                Ok(())
527            })
528        }
529    }
530
531    pub fn fuzzy_search_users(
532        &mut self,
533        query: String,
534        cx: &mut ModelContext<Self>,
535    ) -> Task<Result<Vec<Arc<User>>>> {
536        self.load_users(proto::FuzzySearchUsers { query }, cx)
537    }
538
539    pub fn fetch_user(
540        &mut self,
541        user_id: u64,
542        cx: &mut ModelContext<Self>,
543    ) -> Task<Result<Arc<User>>> {
544        if let Some(user) = self.users.get(&user_id).cloned() {
545            return cx.foreground().spawn(async move { Ok(user) });
546        }
547
548        let load_users = self.get_users(vec![user_id], cx);
549        cx.spawn(|this, mut cx| async move {
550            load_users.await?;
551            this.update(&mut cx, |this, _| {
552                this.users
553                    .get(&user_id)
554                    .cloned()
555                    .ok_or_else(|| anyhow!("server responded with no users"))
556            })
557        })
558    }
559
560    pub fn current_user(&self) -> Option<Arc<User>> {
561        self.current_user.borrow().clone()
562    }
563
564    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
565        self.current_user.clone()
566    }
567
568    fn load_users(
569        &mut self,
570        request: impl RequestMessage<Response = UsersResponse>,
571        cx: &mut ModelContext<Self>,
572    ) -> Task<Result<Vec<Arc<User>>>> {
573        let client = self.client.clone();
574        let http = self.http.clone();
575        cx.spawn_weak(|this, mut cx| async move {
576            if let Some(rpc) = client.upgrade() {
577                let response = rpc.request(request).await.context("error loading users")?;
578                let users = future::join_all(
579                    response
580                        .users
581                        .into_iter()
582                        .map(|user| User::new(user, http.as_ref())),
583                )
584                .await;
585
586                if let Some(this) = this.upgrade(&cx) {
587                    this.update(&mut cx, |this, _| {
588                        for user in &users {
589                            this.users.insert(user.id, user.clone());
590                        }
591                    });
592                }
593                Ok(users)
594            } else {
595                Ok(Vec::new())
596            }
597        })
598    }
599}
600
601impl User {
602    async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
603        Arc::new(User {
604            id: message.id,
605            github_login: message.github_login,
606            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
607        })
608    }
609}
610
611impl Contact {
612    async fn from_proto(
613        contact: proto::Contact,
614        user_store: &ModelHandle<UserStore>,
615        cx: &mut AsyncAppContext,
616    ) -> Result<Self> {
617        let user = user_store
618            .update(cx, |user_store, cx| {
619                user_store.fetch_user(contact.user_id, cx)
620            })
621            .await?;
622        let mut projects = Vec::new();
623        for project in contact.projects {
624            let mut guests = BTreeSet::new();
625            for participant_id in project.guests {
626                guests.insert(
627                    user_store
628                        .update(cx, |user_store, cx| {
629                            user_store.fetch_user(participant_id, cx)
630                        })
631                        .await?,
632                );
633            }
634            projects.push(ProjectMetadata {
635                id: project.id,
636                visible_worktree_root_names: project.visible_worktree_root_names.clone(),
637                guests,
638            });
639        }
640        Ok(Self {
641            user,
642            online: contact.online,
643            projects,
644        })
645    }
646
647    pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
648        self.projects
649            .iter()
650            .filter(|project| !project.visible_worktree_root_names.is_empty())
651    }
652}
653
654async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
655    let mut response = http
656        .get(url, Default::default(), true)
657        .await
658        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
659
660    if !response.status().is_success() {
661        return Err(anyhow!("avatar request failed {:?}", response.status()));
662    }
663
664    let mut body = Vec::new();
665    response
666        .body_mut()
667        .read_to_end(&mut body)
668        .await
669        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
670    let format = image::guess_format(&body)?;
671    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
672    Ok(ImageData::new(image))
673}