user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use collections::{hash_map::Entry, HashMap, HashSet};
  4use feature_flags2::FeatureFlagAppExt;
  5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
  6use gpui2::{AsyncAppContext, EventEmitter, ImageData, Model, ModelContext, Task};
  7use postage::{sink::Sink, watch};
  8use rpc2::proto::{RequestMessage, UsersResponse};
  9use std::sync::{Arc, Weak};
 10use text::ReplicaId;
 11use util::http::HttpClient;
 12use util::TryFutureExt as _;
 13
 14pub type UserId = u64;
 15
 16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 17pub struct ParticipantIndex(pub u32);
 18
 19#[derive(Default, Debug)]
 20pub struct User {
 21    pub id: UserId,
 22    pub github_login: String,
 23    pub avatar: Option<Arc<ImageData>>,
 24}
 25
 26#[derive(Clone, Debug, PartialEq, Eq)]
 27pub struct Collaborator {
 28    pub peer_id: proto::PeerId,
 29    pub replica_id: ReplicaId,
 30    pub user_id: UserId,
 31}
 32
 33impl PartialOrd for User {
 34    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 35        Some(self.cmp(other))
 36    }
 37}
 38
 39impl Ord for User {
 40    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 41        self.github_login.cmp(&other.github_login)
 42    }
 43}
 44
 45impl PartialEq for User {
 46    fn eq(&self, other: &Self) -> bool {
 47        self.id == other.id && self.github_login == other.github_login
 48    }
 49}
 50
 51impl Eq for User {}
 52
 53#[derive(Debug, PartialEq)]
 54pub struct Contact {
 55    pub user: Arc<User>,
 56    pub online: bool,
 57    pub busy: bool,
 58}
 59
 60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 61pub enum ContactRequestStatus {
 62    None,
 63    RequestSent,
 64    RequestReceived,
 65    RequestAccepted,
 66}
 67
 68pub struct UserStore {
 69    users: HashMap<u64, Arc<User>>,
 70    participant_indices: HashMap<u64, ParticipantIndex>,
 71    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 72    current_user: watch::Receiver<Option<Arc<User>>>,
 73    contacts: Vec<Arc<Contact>>,
 74    incoming_contact_requests: Vec<Arc<User>>,
 75    outgoing_contact_requests: Vec<Arc<User>>,
 76    pending_contact_requests: HashMap<u64, usize>,
 77    invite_info: Option<InviteInfo>,
 78    client: Weak<Client>,
 79    http: Arc<dyn HttpClient>,
 80    _maintain_contacts: Task<()>,
 81    _maintain_current_user: Task<Result<()>>,
 82}
 83
 84#[derive(Clone)]
 85pub struct InviteInfo {
 86    pub count: u32,
 87    pub url: Arc<str>,
 88}
 89
 90pub enum Event {
 91    Contact {
 92        user: Arc<User>,
 93        kind: ContactEventKind,
 94    },
 95    ShowContacts,
 96    ParticipantIndicesChanged,
 97}
 98
 99#[derive(Clone, Copy)]
100pub enum ContactEventKind {
101    Requested,
102    Accepted,
103    Cancelled,
104}
105
106impl EventEmitter for UserStore {
107    type Event = Event;
108}
109
110enum UpdateContacts {
111    Update(proto::UpdateContacts),
112    Wait(postage::barrier::Sender),
113    Clear(postage::barrier::Sender),
114}
115
116impl UserStore {
117    pub fn new(
118        client: Arc<Client>,
119        http: Arc<dyn HttpClient>,
120        cx: &mut ModelContext<Self>,
121    ) -> Self {
122        let (mut current_user_tx, current_user_rx) = watch::channel();
123        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
124        let rpc_subscriptions = vec![
125            client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
126            client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
127            client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
128        ];
129        Self {
130            users: Default::default(),
131            current_user: current_user_rx,
132            contacts: Default::default(),
133            incoming_contact_requests: Default::default(),
134            participant_indices: Default::default(),
135            outgoing_contact_requests: Default::default(),
136            invite_info: None,
137            client: Arc::downgrade(&client),
138            update_contacts_tx,
139            http,
140            _maintain_contacts: cx.spawn(|this, mut cx| async move {
141                let _subscriptions = rpc_subscriptions;
142                while let Some(message) = update_contacts_rx.next().await {
143                    if let Ok(task) =
144                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
145                    {
146                        task.log_err().await;
147                    } else {
148                        break;
149                    }
150                }
151            }),
152            _maintain_current_user: cx.spawn(|this, mut cx| async move {
153                let mut status = client.status();
154                while let Some(status) = status.next().await {
155                    match status {
156                        Status::Connected { .. } => {
157                            if let Some(user_id) = client.user_id() {
158                                let fetch_user = if let Ok(fetch_user) = this
159                                    .update(&mut cx, |this, cx| {
160                                        this.get_user(user_id, cx).log_err()
161                                    }) {
162                                    fetch_user
163                                } else {
164                                    break;
165                                };
166                                let fetch_metrics_id =
167                                    client.request(proto::GetPrivateUserInfo {}).log_err();
168                                let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
169
170                                cx.update(|cx| {
171                                    if let Some(info) = info {
172                                        cx.update_flags(info.staff, info.flags);
173                                        client.telemetry.set_authenticated_user_info(
174                                            Some(info.metrics_id.clone()),
175                                            info.staff,
176                                            cx,
177                                        )
178                                    }
179                                })?;
180
181                                current_user_tx.send(user).await.ok();
182
183                                this.update(&mut cx, |_, cx| cx.notify())?;
184                            }
185                        }
186                        Status::SignedOut => {
187                            current_user_tx.send(None).await.ok();
188                            this.update(&mut cx, |this, cx| {
189                                cx.notify();
190                                this.clear_contacts()
191                            })?
192                            .await;
193                        }
194                        Status::ConnectionLost => {
195                            this.update(&mut cx, |this, cx| {
196                                cx.notify();
197                                this.clear_contacts()
198                            })?
199                            .await;
200                        }
201                        _ => {}
202                    }
203                }
204                Ok(())
205            }),
206            pending_contact_requests: Default::default(),
207        }
208    }
209
210    #[cfg(feature = "test-support")]
211    pub fn clear_cache(&mut self) {
212        self.users.clear();
213    }
214
215    async fn handle_update_invite_info(
216        this: Model<Self>,
217        message: TypedEnvelope<proto::UpdateInviteInfo>,
218        _: Arc<Client>,
219        mut cx: AsyncAppContext,
220    ) -> Result<()> {
221        this.update(&mut cx, |this, cx| {
222            this.invite_info = Some(InviteInfo {
223                url: Arc::from(message.payload.url),
224                count: message.payload.count,
225            });
226            cx.notify();
227        })?;
228        Ok(())
229    }
230
231    async fn handle_show_contacts(
232        this: Model<Self>,
233        _: TypedEnvelope<proto::ShowContacts>,
234        _: Arc<Client>,
235        mut cx: AsyncAppContext,
236    ) -> Result<()> {
237        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
238        Ok(())
239    }
240
241    pub fn invite_info(&self) -> Option<&InviteInfo> {
242        self.invite_info.as_ref()
243    }
244
245    async fn handle_update_contacts(
246        this: Model<Self>,
247        message: TypedEnvelope<proto::UpdateContacts>,
248        _: Arc<Client>,
249        mut cx: AsyncAppContext,
250    ) -> Result<()> {
251        this.update(&mut cx, |this, _| {
252            this.update_contacts_tx
253                .unbounded_send(UpdateContacts::Update(message.payload))
254                .unwrap();
255        })?;
256        Ok(())
257    }
258
259    fn update_contacts(
260        &mut self,
261        message: UpdateContacts,
262        cx: &mut ModelContext<Self>,
263    ) -> Task<Result<()>> {
264        match message {
265            UpdateContacts::Wait(barrier) => {
266                drop(barrier);
267                Task::ready(Ok(()))
268            }
269            UpdateContacts::Clear(barrier) => {
270                self.contacts.clear();
271                self.incoming_contact_requests.clear();
272                self.outgoing_contact_requests.clear();
273                drop(barrier);
274                Task::ready(Ok(()))
275            }
276            UpdateContacts::Update(message) => {
277                let mut user_ids = HashSet::default();
278                for contact in &message.contacts {
279                    user_ids.insert(contact.user_id);
280                }
281                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
282                user_ids.extend(message.outgoing_requests.iter());
283
284                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
285                cx.spawn(|this, mut cx| async move {
286                    load_users.await?;
287
288                    // Users are fetched in parallel above and cached in call to get_users
289                    // No need to paralellize here
290                    let mut updated_contacts = Vec::new();
291                    let this = this
292                        .upgrade()
293                        .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
294                    for contact in message.contacts {
295                        let should_notify = contact.should_notify;
296                        updated_contacts.push((
297                            Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
298                            should_notify,
299                        ));
300                    }
301
302                    let mut incoming_requests = Vec::new();
303                    for request in message.incoming_requests {
304                        incoming_requests.push({
305                            let user = this
306                                .update(&mut cx, |this, cx| {
307                                    this.get_user(request.requester_id, cx)
308                                })?
309                                .await?;
310                            (user, request.should_notify)
311                        });
312                    }
313
314                    let mut outgoing_requests = Vec::new();
315                    for requested_user_id in message.outgoing_requests {
316                        outgoing_requests.push(
317                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
318                                .await?,
319                        );
320                    }
321
322                    let removed_contacts =
323                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
324                    let removed_incoming_requests =
325                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
326                    let removed_outgoing_requests =
327                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
328
329                    this.update(&mut cx, |this, cx| {
330                        // Remove contacts
331                        this.contacts
332                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
333                        // Update existing contacts and insert new ones
334                        for (updated_contact, should_notify) in updated_contacts {
335                            if should_notify {
336                                cx.emit(Event::Contact {
337                                    user: updated_contact.user.clone(),
338                                    kind: ContactEventKind::Accepted,
339                                });
340                            }
341                            match this.contacts.binary_search_by_key(
342                                &&updated_contact.user.github_login,
343                                |contact| &contact.user.github_login,
344                            ) {
345                                Ok(ix) => this.contacts[ix] = updated_contact,
346                                Err(ix) => this.contacts.insert(ix, updated_contact),
347                            }
348                        }
349
350                        // Remove incoming contact requests
351                        this.incoming_contact_requests.retain(|user| {
352                            if removed_incoming_requests.contains(&user.id) {
353                                cx.emit(Event::Contact {
354                                    user: user.clone(),
355                                    kind: ContactEventKind::Cancelled,
356                                });
357                                false
358                            } else {
359                                true
360                            }
361                        });
362                        // Update existing incoming requests and insert new ones
363                        for (user, should_notify) in incoming_requests {
364                            if should_notify {
365                                cx.emit(Event::Contact {
366                                    user: user.clone(),
367                                    kind: ContactEventKind::Requested,
368                                });
369                            }
370
371                            match this
372                                .incoming_contact_requests
373                                .binary_search_by_key(&&user.github_login, |contact| {
374                                    &contact.github_login
375                                }) {
376                                Ok(ix) => this.incoming_contact_requests[ix] = user,
377                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
378                            }
379                        }
380
381                        // Remove outgoing contact requests
382                        this.outgoing_contact_requests
383                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
384                        // Update existing incoming requests and insert new ones
385                        for request in outgoing_requests {
386                            match this
387                                .outgoing_contact_requests
388                                .binary_search_by_key(&&request.github_login, |contact| {
389                                    &contact.github_login
390                                }) {
391                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
392                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
393                            }
394                        }
395
396                        cx.notify();
397                    })?;
398
399                    Ok(())
400                })
401            }
402        }
403    }
404
405    pub fn contacts(&self) -> &[Arc<Contact>] {
406        &self.contacts
407    }
408
409    pub fn has_contact(&self, user: &Arc<User>) -> bool {
410        self.contacts
411            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
412            .is_ok()
413    }
414
415    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
416        &self.incoming_contact_requests
417    }
418
419    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
420        &self.outgoing_contact_requests
421    }
422
423    pub fn is_contact_request_pending(&self, user: &User) -> bool {
424        self.pending_contact_requests.contains_key(&user.id)
425    }
426
427    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
428        if self
429            .contacts
430            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
431            .is_ok()
432        {
433            ContactRequestStatus::RequestAccepted
434        } else if self
435            .outgoing_contact_requests
436            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
437            .is_ok()
438        {
439            ContactRequestStatus::RequestSent
440        } else if self
441            .incoming_contact_requests
442            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
443            .is_ok()
444        {
445            ContactRequestStatus::RequestReceived
446        } else {
447            ContactRequestStatus::None
448        }
449    }
450
451    pub fn request_contact(
452        &mut self,
453        responder_id: u64,
454        cx: &mut ModelContext<Self>,
455    ) -> Task<Result<()>> {
456        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
457    }
458
459    pub fn remove_contact(
460        &mut self,
461        user_id: u64,
462        cx: &mut ModelContext<Self>,
463    ) -> Task<Result<()>> {
464        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
465    }
466
467    pub fn respond_to_contact_request(
468        &mut self,
469        requester_id: u64,
470        accept: bool,
471        cx: &mut ModelContext<Self>,
472    ) -> Task<Result<()>> {
473        self.perform_contact_request(
474            requester_id,
475            proto::RespondToContactRequest {
476                requester_id,
477                response: if accept {
478                    proto::ContactRequestResponse::Accept
479                } else {
480                    proto::ContactRequestResponse::Decline
481                } as i32,
482            },
483            cx,
484        )
485    }
486
487    pub fn dismiss_contact_request(
488        &mut self,
489        requester_id: u64,
490        cx: &mut ModelContext<Self>,
491    ) -> Task<Result<()>> {
492        let client = self.client.upgrade();
493        cx.spawn(move |_, _| async move {
494            client
495                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
496                .request(proto::RespondToContactRequest {
497                    requester_id,
498                    response: proto::ContactRequestResponse::Dismiss as i32,
499                })
500                .await?;
501            Ok(())
502        })
503    }
504
505    fn perform_contact_request<T: RequestMessage>(
506        &mut self,
507        user_id: u64,
508        request: T,
509        cx: &mut ModelContext<Self>,
510    ) -> Task<Result<()>> {
511        let client = self.client.upgrade();
512        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
513        cx.notify();
514
515        cx.spawn(move |this, mut cx| async move {
516            let response = client
517                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
518                .request(request)
519                .await;
520            this.update(&mut cx, |this, cx| {
521                if let Entry::Occupied(mut request_count) =
522                    this.pending_contact_requests.entry(user_id)
523                {
524                    *request_count.get_mut() -= 1;
525                    if *request_count.get() == 0 {
526                        request_count.remove();
527                    }
528                }
529                cx.notify();
530            })?;
531            response?;
532            Ok(())
533        })
534    }
535
536    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
537        let (tx, mut rx) = postage::barrier::channel();
538        self.update_contacts_tx
539            .unbounded_send(UpdateContacts::Clear(tx))
540            .unwrap();
541        async move {
542            rx.next().await;
543        }
544    }
545
546    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
547        let (tx, mut rx) = postage::barrier::channel();
548        self.update_contacts_tx
549            .unbounded_send(UpdateContacts::Wait(tx))
550            .unwrap();
551        async move {
552            rx.next().await;
553        }
554    }
555
556    pub fn get_users(
557        &mut self,
558        user_ids: Vec<u64>,
559        cx: &mut ModelContext<Self>,
560    ) -> Task<Result<Vec<Arc<User>>>> {
561        let mut user_ids_to_fetch = user_ids.clone();
562        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
563
564        cx.spawn(|this, mut cx| async move {
565            if !user_ids_to_fetch.is_empty() {
566                this.update(&mut cx, |this, cx| {
567                    this.load_users(
568                        proto::GetUsers {
569                            user_ids: user_ids_to_fetch,
570                        },
571                        cx,
572                    )
573                })?
574                .await?;
575            }
576
577            this.update(&mut cx, |this, _| {
578                user_ids
579                    .iter()
580                    .map(|user_id| {
581                        this.users
582                            .get(user_id)
583                            .cloned()
584                            .ok_or_else(|| anyhow!("user {} not found", user_id))
585                    })
586                    .collect()
587            })?
588        })
589    }
590
591    pub fn fuzzy_search_users(
592        &mut self,
593        query: String,
594        cx: &mut ModelContext<Self>,
595    ) -> Task<Result<Vec<Arc<User>>>> {
596        self.load_users(proto::FuzzySearchUsers { query }, cx)
597    }
598
599    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
600        self.users.get(&user_id).cloned()
601    }
602
603    pub fn get_user(
604        &mut self,
605        user_id: u64,
606        cx: &mut ModelContext<Self>,
607    ) -> Task<Result<Arc<User>>> {
608        if let Some(user) = self.users.get(&user_id).cloned() {
609            return Task::ready(Ok(user));
610        }
611
612        let load_users = self.get_users(vec![user_id], cx);
613        cx.spawn(move |this, mut cx| async move {
614            load_users.await?;
615            this.update(&mut cx, |this, _| {
616                this.users
617                    .get(&user_id)
618                    .cloned()
619                    .ok_or_else(|| anyhow!("server responded with no users"))
620            })?
621        })
622    }
623
624    pub fn current_user(&self) -> Option<Arc<User>> {
625        self.current_user.borrow().clone()
626    }
627
628    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
629        self.current_user.clone()
630    }
631
632    fn load_users(
633        &mut self,
634        request: impl RequestMessage<Response = UsersResponse>,
635        cx: &mut ModelContext<Self>,
636    ) -> Task<Result<Vec<Arc<User>>>> {
637        let client = self.client.clone();
638        let http = self.http.clone();
639        cx.spawn(|this, mut cx| async move {
640            if let Some(rpc) = client.upgrade() {
641                let response = rpc.request(request).await.context("error loading users")?;
642                let users = future::join_all(
643                    response
644                        .users
645                        .into_iter()
646                        .map(|user| User::new(user, http.as_ref())),
647                )
648                .await;
649
650                this.update(&mut cx, |this, _| {
651                    for user in &users {
652                        this.users.insert(user.id, user.clone());
653                    }
654                })
655                .ok();
656
657                Ok(users)
658            } else {
659                Ok(Vec::new())
660            }
661        })
662    }
663
664    pub fn set_participant_indices(
665        &mut self,
666        participant_indices: HashMap<u64, ParticipantIndex>,
667        cx: &mut ModelContext<Self>,
668    ) {
669        if participant_indices != self.participant_indices {
670            self.participant_indices = participant_indices;
671            cx.emit(Event::ParticipantIndicesChanged);
672        }
673    }
674
675    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
676        &self.participant_indices
677    }
678}
679
680impl User {
681    async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
682        Arc::new(User {
683            id: message.id,
684            github_login: message.github_login,
685            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
686        })
687    }
688}
689
690impl Contact {
691    async fn from_proto(
692        contact: proto::Contact,
693        user_store: &Model<UserStore>,
694        cx: &mut AsyncAppContext,
695    ) -> Result<Self> {
696        let user = user_store
697            .update(cx, |user_store, cx| {
698                user_store.get_user(contact.user_id, cx)
699            })?
700            .await?;
701        Ok(Self {
702            user,
703            online: contact.online,
704            busy: contact.busy,
705        })
706    }
707}
708
709impl Collaborator {
710    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
711        Ok(Self {
712            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
713            replica_id: message.replica_id as ReplicaId,
714            user_id: message.user_id as UserId,
715        })
716    }
717}
718
719// todo!("we probably don't need this now that we fetch")
720async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
721    let mut response = http
722        .get(url, Default::default(), true)
723        .await
724        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
725
726    if !response.status().is_success() {
727        return Err(anyhow!("avatar request failed {:?}", response.status()));
728    }
729
730    let mut body = Vec::new();
731    response
732        .body_mut()
733        .read_to_end(&mut body)
734        .await
735        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
736    let format = image::guess_format(&body)?;
737    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
738    Ok(Arc::new(ImageData::new(image)))
739}