user.rs

  1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
  4use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
  5use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  6use postage::{prelude::Stream, sink::Sink, watch};
  7use rpc::proto::{RequestMessage, UsersResponse};
  8use std::sync::{Arc, Weak};
  9use util::TryFutureExt as _;
 10
 11#[derive(Debug)]
 12pub struct User {
 13    pub id: u64,
 14    pub github_login: String,
 15    pub avatar: Option<Arc<ImageData>>,
 16}
 17
 18impl PartialOrd for User {
 19    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 20        Some(self.cmp(&other))
 21    }
 22}
 23
 24impl Ord for User {
 25    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 26        self.github_login.cmp(&other.github_login)
 27    }
 28}
 29
 30impl PartialEq for User {
 31    fn eq(&self, other: &Self) -> bool {
 32        self.id == other.id && self.github_login == other.github_login
 33    }
 34}
 35
 36impl Eq for User {}
 37
 38#[derive(Debug, PartialEq)]
 39pub struct Contact {
 40    pub user: Arc<User>,
 41    pub online: bool,
 42    pub projects: Vec<ProjectMetadata>,
 43}
 44
 45#[derive(Clone, Debug, PartialEq)]
 46pub struct ProjectMetadata {
 47    pub id: u64,
 48    pub visible_worktree_root_names: Vec<String>,
 49    pub guests: BTreeSet<Arc<User>>,
 50}
 51
 52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 53pub enum ContactRequestStatus {
 54    None,
 55    RequestSent,
 56    RequestReceived,
 57    RequestAccepted,
 58}
 59
 60pub struct UserStore {
 61    users: HashMap<u64, Arc<User>>,
 62    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 63    current_user: watch::Receiver<Option<Arc<User>>>,
 64    contacts: Vec<Arc<Contact>>,
 65    incoming_contact_requests: Vec<Arc<User>>,
 66    outgoing_contact_requests: Vec<Arc<User>>,
 67    pending_contact_requests: HashMap<u64, usize>,
 68    invite_info: Option<InviteInfo>,
 69    client: Weak<Client>,
 70    http: Arc<dyn HttpClient>,
 71    _maintain_contacts: Task<()>,
 72    _maintain_current_user: Task<()>,
 73}
 74
 75#[derive(Clone)]
 76pub struct InviteInfo {
 77    pub count: u32,
 78    pub url: Arc<str>,
 79}
 80
 81pub enum Event {
 82    Contact {
 83        user: Arc<User>,
 84        kind: ContactEventKind,
 85    },
 86    ShowContacts,
 87}
 88
 89#[derive(Clone, Copy)]
 90pub enum ContactEventKind {
 91    Requested,
 92    Accepted,
 93    Cancelled,
 94}
 95
 96impl Entity for UserStore {
 97    type Event = Event;
 98}
 99
100enum UpdateContacts {
101    Update(proto::UpdateContacts),
102    Wait(postage::barrier::Sender),
103    Clear(postage::barrier::Sender),
104}
105
106impl UserStore {
107    pub fn new(
108        client: Arc<Client>,
109        http: Arc<dyn HttpClient>,
110        cx: &mut ModelContext<Self>,
111    ) -> Self {
112        let (mut current_user_tx, current_user_rx) = watch::channel();
113        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
114        let rpc_subscriptions = vec![
115            client.add_message_handler(cx.handle(), Self::handle_update_contacts),
116            client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
117            client.add_message_handler(cx.handle(), Self::handle_show_contacts),
118        ];
119        Self {
120            users: Default::default(),
121            current_user: current_user_rx,
122            contacts: Default::default(),
123            incoming_contact_requests: Default::default(),
124            outgoing_contact_requests: Default::default(),
125            invite_info: None,
126            client: Arc::downgrade(&client),
127            update_contacts_tx,
128            http,
129            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
130                let _subscriptions = rpc_subscriptions;
131                while let Some(message) = update_contacts_rx.next().await {
132                    if let Some(this) = this.upgrade(&cx) {
133                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
134                            .log_err()
135                            .await;
136                    }
137                }
138            }),
139            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
140                let mut status = client.status();
141                while let Some(status) = status.recv().await {
142                    match status {
143                        Status::Connected { .. } => {
144                            if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
145                                let user = this
146                                    .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
147                                    .log_err()
148                                    .await;
149                                current_user_tx.send(user).await.ok();
150                            }
151                        }
152                        Status::SignedOut => {
153                            current_user_tx.send(None).await.ok();
154                            if let Some(this) = this.upgrade(&cx) {
155                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
156                            }
157                        }
158                        Status::ConnectionLost => {
159                            if let Some(this) = this.upgrade(&cx) {
160                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
161                            }
162                        }
163                        _ => {}
164                    }
165                }
166            }),
167            pending_contact_requests: Default::default(),
168        }
169    }
170
171    async fn handle_update_invite_info(
172        this: ModelHandle<Self>,
173        message: TypedEnvelope<proto::UpdateInviteInfo>,
174        _: Arc<Client>,
175        mut cx: AsyncAppContext,
176    ) -> Result<()> {
177        this.update(&mut cx, |this, cx| {
178            this.invite_info = Some(InviteInfo {
179                url: Arc::from(message.payload.url),
180                count: message.payload.count,
181            });
182            cx.notify();
183        });
184        Ok(())
185    }
186
187    async fn handle_show_contacts(
188        this: ModelHandle<Self>,
189        _: TypedEnvelope<proto::ShowContacts>,
190        _: Arc<Client>,
191        mut cx: AsyncAppContext,
192    ) -> Result<()> {
193        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
194        Ok(())
195    }
196
197    pub fn invite_info(&self) -> Option<&InviteInfo> {
198        self.invite_info.as_ref()
199    }
200
201    async fn handle_update_contacts(
202        this: ModelHandle<Self>,
203        message: TypedEnvelope<proto::UpdateContacts>,
204        _: Arc<Client>,
205        mut cx: AsyncAppContext,
206    ) -> Result<()> {
207        this.update(&mut cx, |this, _| {
208            this.update_contacts_tx
209                .unbounded_send(UpdateContacts::Update(message.payload))
210                .unwrap();
211        });
212        Ok(())
213    }
214
215    fn update_contacts(
216        &mut self,
217        message: UpdateContacts,
218        cx: &mut ModelContext<Self>,
219    ) -> Task<Result<()>> {
220        match message {
221            UpdateContacts::Wait(barrier) => {
222                drop(barrier);
223                Task::ready(Ok(()))
224            }
225            UpdateContacts::Clear(barrier) => {
226                self.contacts.clear();
227                self.incoming_contact_requests.clear();
228                self.outgoing_contact_requests.clear();
229                drop(barrier);
230                Task::ready(Ok(()))
231            }
232            UpdateContacts::Update(message) => {
233                log::info!(
234                    "update contacts on client {}: {:?}",
235                    self.client.upgrade().unwrap().id,
236                    message
237                );
238                let mut user_ids = HashSet::default();
239                for contact in &message.contacts {
240                    user_ids.insert(contact.user_id);
241                    user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
242                }
243                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
244                user_ids.extend(message.outgoing_requests.iter());
245
246                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
247                cx.spawn(|this, mut cx| async move {
248                    load_users.await?;
249
250                    // Users are fetched in parallel above and cached in call to get_users
251                    // No need to paralellize here
252                    let mut updated_contacts = Vec::new();
253                    for contact in message.contacts {
254                        let should_notify = contact.should_notify;
255                        updated_contacts.push((
256                            Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
257                            should_notify,
258                        ));
259                    }
260
261                    let mut incoming_requests = Vec::new();
262                    for request in message.incoming_requests {
263                        incoming_requests.push({
264                            let user = this
265                                .update(&mut cx, |this, cx| {
266                                    this.fetch_user(request.requester_id, cx)
267                                })
268                                .await?;
269                            (user, request.should_notify)
270                        });
271                    }
272
273                    let mut outgoing_requests = Vec::new();
274                    for requested_user_id in message.outgoing_requests {
275                        outgoing_requests.push(
276                            this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
277                                .await?,
278                        );
279                    }
280
281                    let removed_contacts =
282                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
283                    let removed_incoming_requests =
284                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
285                    let removed_outgoing_requests =
286                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
287
288                    this.update(&mut cx, |this, cx| {
289                        // Remove contacts
290                        this.contacts
291                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
292                        // Update existing contacts and insert new ones
293                        for (updated_contact, should_notify) in updated_contacts {
294                            if should_notify {
295                                cx.emit(Event::Contact {
296                                    user: updated_contact.user.clone(),
297                                    kind: ContactEventKind::Accepted,
298                                });
299                            }
300                            match this.contacts.binary_search_by_key(
301                                &&updated_contact.user.github_login,
302                                |contact| &contact.user.github_login,
303                            ) {
304                                Ok(ix) => this.contacts[ix] = updated_contact,
305                                Err(ix) => this.contacts.insert(ix, updated_contact),
306                            }
307                        }
308
309                        // Remove incoming contact requests
310                        this.incoming_contact_requests.retain(|user| {
311                            if removed_incoming_requests.contains(&user.id) {
312                                cx.emit(Event::Contact {
313                                    user: user.clone(),
314                                    kind: ContactEventKind::Cancelled,
315                                });
316                                false
317                            } else {
318                                true
319                            }
320                        });
321                        // Update existing incoming requests and insert new ones
322                        for (user, should_notify) in incoming_requests {
323                            if should_notify {
324                                cx.emit(Event::Contact {
325                                    user: user.clone(),
326                                    kind: ContactEventKind::Requested,
327                                });
328                            }
329
330                            match this
331                                .incoming_contact_requests
332                                .binary_search_by_key(&&user.github_login, |contact| {
333                                    &contact.github_login
334                                }) {
335                                Ok(ix) => this.incoming_contact_requests[ix] = user,
336                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
337                            }
338                        }
339
340                        // Remove outgoing contact requests
341                        this.outgoing_contact_requests
342                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
343                        // Update existing incoming requests and insert new ones
344                        for request in outgoing_requests {
345                            match this
346                                .outgoing_contact_requests
347                                .binary_search_by_key(&&request.github_login, |contact| {
348                                    &contact.github_login
349                                }) {
350                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
351                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
352                            }
353                        }
354
355                        cx.notify();
356                    });
357
358                    Ok(())
359                })
360            }
361        }
362    }
363
364    pub fn contacts(&self) -> &[Arc<Contact>] {
365        &self.contacts
366    }
367
368    pub fn has_contact(&self, user: &Arc<User>) -> bool {
369        self.contacts
370            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
371            .is_ok()
372    }
373
374    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
375        &self.incoming_contact_requests
376    }
377
378    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
379        &self.outgoing_contact_requests
380    }
381
382    pub fn is_contact_request_pending(&self, user: &User) -> bool {
383        self.pending_contact_requests.contains_key(&user.id)
384    }
385
386    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
387        if self
388            .contacts
389            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
390            .is_ok()
391        {
392            ContactRequestStatus::RequestAccepted
393        } else if self
394            .outgoing_contact_requests
395            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
396            .is_ok()
397        {
398            ContactRequestStatus::RequestSent
399        } else if self
400            .incoming_contact_requests
401            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
402            .is_ok()
403        {
404            ContactRequestStatus::RequestReceived
405        } else {
406            ContactRequestStatus::None
407        }
408    }
409
410    pub fn request_contact(
411        &mut self,
412        responder_id: u64,
413        cx: &mut ModelContext<Self>,
414    ) -> Task<Result<()>> {
415        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
416    }
417
418    pub fn remove_contact(
419        &mut self,
420        user_id: u64,
421        cx: &mut ModelContext<Self>,
422    ) -> Task<Result<()>> {
423        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
424    }
425
426    pub fn respond_to_contact_request(
427        &mut self,
428        requester_id: u64,
429        accept: bool,
430        cx: &mut ModelContext<Self>,
431    ) -> Task<Result<()>> {
432        self.perform_contact_request(
433            requester_id,
434            proto::RespondToContactRequest {
435                requester_id,
436                response: if accept {
437                    proto::ContactRequestResponse::Accept
438                } else {
439                    proto::ContactRequestResponse::Decline
440                } as i32,
441            },
442            cx,
443        )
444    }
445
446    pub fn dismiss_contact_request(
447        &mut self,
448        requester_id: u64,
449        cx: &mut ModelContext<Self>,
450    ) -> Task<Result<()>> {
451        let client = self.client.upgrade();
452        cx.spawn_weak(|_, _| async move {
453            client
454                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
455                .request(proto::RespondToContactRequest {
456                    requester_id,
457                    response: proto::ContactRequestResponse::Dismiss as i32,
458                })
459                .await?;
460            Ok(())
461        })
462    }
463
464    fn perform_contact_request<T: RequestMessage>(
465        &mut self,
466        user_id: u64,
467        request: T,
468        cx: &mut ModelContext<Self>,
469    ) -> Task<Result<()>> {
470        let client = self.client.upgrade();
471        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
472        cx.notify();
473
474        cx.spawn(|this, mut cx| async move {
475            let response = client
476                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
477                .request(request)
478                .await;
479            this.update(&mut cx, |this, cx| {
480                if let Entry::Occupied(mut request_count) =
481                    this.pending_contact_requests.entry(user_id)
482                {
483                    *request_count.get_mut() -= 1;
484                    if *request_count.get() == 0 {
485                        request_count.remove();
486                    }
487                }
488                cx.notify();
489            });
490            response?;
491            Ok(())
492        })
493    }
494
495    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
496        let (tx, mut rx) = postage::barrier::channel();
497        self.update_contacts_tx
498            .unbounded_send(UpdateContacts::Clear(tx))
499            .unwrap();
500        async move {
501            rx.recv().await;
502        }
503    }
504
505    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
506        let (tx, mut rx) = postage::barrier::channel();
507        self.update_contacts_tx
508            .unbounded_send(UpdateContacts::Wait(tx))
509            .unwrap();
510        async move {
511            rx.recv().await;
512        }
513    }
514
515    pub fn get_users(
516        &mut self,
517        mut user_ids: Vec<u64>,
518        cx: &mut ModelContext<Self>,
519    ) -> Task<Result<()>> {
520        user_ids.retain(|id| !self.users.contains_key(id));
521        if user_ids.is_empty() {
522            Task::ready(Ok(()))
523        } else {
524            let load = self.load_users(proto::GetUsers { user_ids }, cx);
525            cx.foreground().spawn(async move {
526                load.await?;
527                Ok(())
528            })
529        }
530    }
531
532    pub fn fuzzy_search_users(
533        &mut self,
534        query: String,
535        cx: &mut ModelContext<Self>,
536    ) -> Task<Result<Vec<Arc<User>>>> {
537        self.load_users(proto::FuzzySearchUsers { query }, cx)
538    }
539
540    pub fn fetch_user(
541        &mut self,
542        user_id: u64,
543        cx: &mut ModelContext<Self>,
544    ) -> Task<Result<Arc<User>>> {
545        if let Some(user) = self.users.get(&user_id).cloned() {
546            return cx.foreground().spawn(async move { Ok(user) });
547        }
548
549        let load_users = self.get_users(vec![user_id], cx);
550        cx.spawn(|this, mut cx| async move {
551            load_users.await?;
552            this.update(&mut cx, |this, _| {
553                this.users
554                    .get(&user_id)
555                    .cloned()
556                    .ok_or_else(|| anyhow!("server responded with no users"))
557            })
558        })
559    }
560
561    pub fn current_user(&self) -> Option<Arc<User>> {
562        self.current_user.borrow().clone()
563    }
564
565    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
566        self.current_user.clone()
567    }
568
569    fn load_users(
570        &mut self,
571        request: impl RequestMessage<Response = UsersResponse>,
572        cx: &mut ModelContext<Self>,
573    ) -> Task<Result<Vec<Arc<User>>>> {
574        let client = self.client.clone();
575        let http = self.http.clone();
576        cx.spawn_weak(|this, mut cx| async move {
577            if let Some(rpc) = client.upgrade() {
578                let response = rpc.request(request).await.context("error loading users")?;
579                let users = future::join_all(
580                    response
581                        .users
582                        .into_iter()
583                        .map(|user| User::new(user, http.as_ref())),
584                )
585                .await;
586
587                if let Some(this) = this.upgrade(&cx) {
588                    this.update(&mut cx, |this, _| {
589                        for user in &users {
590                            this.users.insert(user.id, user.clone());
591                        }
592                    });
593                }
594                Ok(users)
595            } else {
596                Ok(Vec::new())
597            }
598        })
599    }
600}
601
602impl User {
603    async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
604        Arc::new(User {
605            id: message.id,
606            github_login: message.github_login,
607            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
608        })
609    }
610}
611
612impl Contact {
613    async fn from_proto(
614        contact: proto::Contact,
615        user_store: &ModelHandle<UserStore>,
616        cx: &mut AsyncAppContext,
617    ) -> Result<Self> {
618        let user = user_store
619            .update(cx, |user_store, cx| {
620                user_store.fetch_user(contact.user_id, cx)
621            })
622            .await?;
623        let mut projects = Vec::new();
624        for project in contact.projects {
625            let mut guests = BTreeSet::new();
626            for participant_id in project.guests {
627                guests.insert(
628                    user_store
629                        .update(cx, |user_store, cx| {
630                            user_store.fetch_user(participant_id, cx)
631                        })
632                        .await?,
633                );
634            }
635            projects.push(ProjectMetadata {
636                id: project.id,
637                visible_worktree_root_names: project.visible_worktree_root_names.clone(),
638                guests,
639            });
640        }
641        Ok(Self {
642            user,
643            online: contact.online,
644            projects,
645        })
646    }
647
648    pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
649        self.projects
650            .iter()
651            .filter(|project| !project.visible_worktree_root_names.is_empty())
652    }
653}
654
655async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
656    let mut response = http
657        .get(url, Default::default(), true)
658        .await
659        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
660
661    if !response.status().is_success() {
662        return Err(anyhow!("avatar request failed {:?}", response.status()));
663    }
664
665    let mut body = Vec::new();
666    response
667        .body_mut()
668        .read_to_end(&mut body)
669        .await
670        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
671    let format = image::guess_format(&body)?;
672    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
673    Ok(ImageData::new(image))
674}