user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use collections::{hash_map::Entry, HashMap, HashSet};
  4use feature_flags::FeatureFlagAppExt;
  5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
  6use gpui2::{AsyncAppContext, EventEmitter, Handle, ImageData, ModelContext, Task};
  7use postage::{sink::Sink, watch};
  8use rpc::proto::{RequestMessage, UsersResponse};
  9use std::sync::{Arc, Weak};
 10use text::ReplicaId;
 11use util::http::HttpClient;
 12use util::TryFutureExt as _;
 13
 14pub type UserId = u64;
 15
 16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 17pub struct ParticipantIndex(pub u32);
 18
 19#[derive(Default, Debug)]
 20pub struct User {
 21    pub id: UserId,
 22    pub github_login: String,
 23    pub avatar: Option<Arc<ImageData>>,
 24}
 25
 26#[derive(Clone, Debug, PartialEq, Eq)]
 27pub struct Collaborator {
 28    pub peer_id: proto::PeerId,
 29    pub replica_id: ReplicaId,
 30    pub user_id: UserId,
 31}
 32
 33impl PartialOrd for User {
 34    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 35        Some(self.cmp(other))
 36    }
 37}
 38
 39impl Ord for User {
 40    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 41        self.github_login.cmp(&other.github_login)
 42    }
 43}
 44
 45impl PartialEq for User {
 46    fn eq(&self, other: &Self) -> bool {
 47        self.id == other.id && self.github_login == other.github_login
 48    }
 49}
 50
 51impl Eq for User {}
 52
 53#[derive(Debug, PartialEq)]
 54pub struct Contact {
 55    pub user: Arc<User>,
 56    pub online: bool,
 57    pub busy: bool,
 58}
 59
 60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 61pub enum ContactRequestStatus {
 62    None,
 63    RequestSent,
 64    RequestReceived,
 65    RequestAccepted,
 66}
 67
 68pub struct UserStore {
 69    users: HashMap<u64, Arc<User>>,
 70    participant_indices: HashMap<u64, ParticipantIndex>,
 71    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 72    current_user: watch::Receiver<Option<Arc<User>>>,
 73    contacts: Vec<Arc<Contact>>,
 74    incoming_contact_requests: Vec<Arc<User>>,
 75    outgoing_contact_requests: Vec<Arc<User>>,
 76    pending_contact_requests: HashMap<u64, usize>,
 77    invite_info: Option<InviteInfo>,
 78    client: Weak<Client>,
 79    http: Arc<dyn HttpClient>,
 80    _maintain_contacts: Task<()>,
 81    _maintain_current_user: Task<()>,
 82}
 83
 84#[derive(Clone)]
 85pub struct InviteInfo {
 86    pub count: u32,
 87    pub url: Arc<str>,
 88}
 89
 90pub enum Event {
 91    Contact {
 92        user: Arc<User>,
 93        kind: ContactEventKind,
 94    },
 95    ShowContacts,
 96    ParticipantIndicesChanged,
 97}
 98
 99#[derive(Clone, Copy)]
100pub enum ContactEventKind {
101    Requested,
102    Accepted,
103    Cancelled,
104}
105
106impl EventEmitter for UserStore {
107    type Event = Event;
108}
109
110enum UpdateContacts {
111    Update(proto::UpdateContacts),
112    Wait(postage::barrier::Sender),
113    Clear(postage::barrier::Sender),
114}
115
116impl UserStore {
117    pub fn new(
118        client: Arc<Client>,
119        http: Arc<dyn HttpClient>,
120        cx: &mut ModelContext<Self>,
121    ) -> Self {
122        let (mut current_user_tx, current_user_rx) = watch::channel();
123        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
124        let rpc_subscriptions = vec![
125            client.add_message_handler(cx.handle(), Self::handle_update_contacts),
126            client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
127            client.add_message_handler(cx.handle(), Self::handle_show_contacts),
128        ];
129        Self {
130            users: Default::default(),
131            current_user: current_user_rx,
132            contacts: Default::default(),
133            incoming_contact_requests: Default::default(),
134            participant_indices: Default::default(),
135            outgoing_contact_requests: Default::default(),
136            invite_info: None,
137            client: Arc::downgrade(&client),
138            update_contacts_tx,
139            http,
140            _maintain_contacts: cx.spawn(|this, mut cx| async move {
141                let _subscriptions = rpc_subscriptions;
142                while let Some(message) = update_contacts_rx.next().await {
143                    if let Ok(task) =
144                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
145                    {
146                        task.log_err().await;
147                    } else {
148                        break;
149                    }
150                }
151            }),
152            _maintain_current_user: cx.spawn(|this, mut cx| async move {
153                let mut status = client.status();
154                while let Some(status) = status.next().await {
155                    match status {
156                        Status::Connected { .. } => {
157                            if let Some(user_id) = client.user_id() {
158                                let fetch_user = if let Ok(fetch_user) = this
159                                    .update(&mut cx, |this, cx| {
160                                        this.get_user(user_id, cx).log_err()
161                                    }) {
162                                    fetch_user
163                                } else {
164                                    break;
165                                };
166                                let fetch_metrics_id =
167                                    client.request(proto::GetPrivateUserInfo {}).log_err();
168                                let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
169
170                                if let Some(info) = info {
171                                    cx.update(|cx| {
172                                        cx.update_flags(info.staff, info.flags);
173                                        client.telemetry.set_authenticated_user_info(
174                                            Some(info.metrics_id.clone()),
175                                            info.staff,
176                                            cx,
177                                        )
178                                    });
179                                } else {
180                                    cx.read(|cx| {
181                                        client
182                                            .telemetry
183                                            .set_authenticated_user_info(None, false, cx)
184                                    });
185                                }
186
187                                current_user_tx.send(user).await.ok();
188
189                                this.update(&mut cx, |_, cx| {
190                                    cx.notify();
191                                });
192                            }
193                        }
194                        Status::SignedOut => {
195                            current_user_tx.send(None).await.ok();
196                            if let Some(this) = this.upgrade(&cx) {
197                                this.update(&mut cx, |this, cx| {
198                                    cx.notify();
199                                    this.clear_contacts()
200                                })
201                                .await;
202                            }
203                        }
204                        Status::ConnectionLost => {
205                            if let Some(this) = this.upgrade(&cx) {
206                                this.update(&mut cx, |this, cx| {
207                                    cx.notify();
208                                    this.clear_contacts()
209                                })
210                                .await;
211                            }
212                        }
213                        _ => {}
214                    }
215                }
216            }),
217            pending_contact_requests: Default::default(),
218        }
219    }
220
221    #[cfg(feature = "test-support")]
222    pub fn clear_cache(&mut self) {
223        self.users.clear();
224    }
225
226    async fn handle_update_invite_info(
227        this: Handle<Self>,
228        message: TypedEnvelope<proto::UpdateInviteInfo>,
229        _: Arc<Client>,
230        mut cx: AsyncAppContext,
231    ) -> Result<()> {
232        this.update(&mut cx, |this, cx| {
233            this.invite_info = Some(InviteInfo {
234                url: Arc::from(message.payload.url),
235                count: message.payload.count,
236            });
237            cx.notify();
238        });
239        Ok(())
240    }
241
242    async fn handle_show_contacts(
243        this: Handle<Self>,
244        _: TypedEnvelope<proto::ShowContacts>,
245        _: Arc<Client>,
246        mut cx: AsyncAppContext,
247    ) -> Result<()> {
248        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
249        Ok(())
250    }
251
252    pub fn invite_info(&self) -> Option<&InviteInfo> {
253        self.invite_info.as_ref()
254    }
255
256    async fn handle_update_contacts(
257        this: Handle<Self>,
258        message: TypedEnvelope<proto::UpdateContacts>,
259        _: Arc<Client>,
260        mut cx: AsyncAppContext,
261    ) -> Result<()> {
262        this.update(&mut cx, |this, _| {
263            this.update_contacts_tx
264                .unbounded_send(UpdateContacts::Update(message.payload))
265                .unwrap();
266        });
267        Ok(())
268    }
269
270    fn update_contacts(
271        &mut self,
272        message: UpdateContacts,
273        cx: &mut ModelContext<Self>,
274    ) -> Task<Result<()>> {
275        match message {
276            UpdateContacts::Wait(barrier) => {
277                drop(barrier);
278                Task::ready(Ok(()))
279            }
280            UpdateContacts::Clear(barrier) => {
281                self.contacts.clear();
282                self.incoming_contact_requests.clear();
283                self.outgoing_contact_requests.clear();
284                drop(barrier);
285                Task::ready(Ok(()))
286            }
287            UpdateContacts::Update(message) => {
288                let mut user_ids = HashSet::default();
289                for contact in &message.contacts {
290                    user_ids.insert(contact.user_id);
291                }
292                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
293                user_ids.extend(message.outgoing_requests.iter());
294
295                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
296                cx.spawn(|this, mut cx| async move {
297                    load_users.await?;
298
299                    // Users are fetched in parallel above and cached in call to get_users
300                    // No need to paralellize here
301                    let mut updated_contacts = Vec::new();
302                    for contact in message.contacts {
303                        let should_notify = contact.should_notify;
304                        updated_contacts.push((
305                            Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
306                            should_notify,
307                        ));
308                    }
309
310                    let mut incoming_requests = Vec::new();
311                    for request in message.incoming_requests {
312                        incoming_requests.push({
313                            let user = this
314                                .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
315                                .await?;
316                            (user, request.should_notify)
317                        });
318                    }
319
320                    let mut outgoing_requests = Vec::new();
321                    for requested_user_id in message.outgoing_requests {
322                        outgoing_requests.push(
323                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
324                                .await?,
325                        );
326                    }
327
328                    let removed_contacts =
329                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
330                    let removed_incoming_requests =
331                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
332                    let removed_outgoing_requests =
333                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
334
335                    this.update(&mut cx, |this, cx| {
336                        // Remove contacts
337                        this.contacts
338                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
339                        // Update existing contacts and insert new ones
340                        for (updated_contact, should_notify) in updated_contacts {
341                            if should_notify {
342                                cx.emit(Event::Contact {
343                                    user: updated_contact.user.clone(),
344                                    kind: ContactEventKind::Accepted,
345                                });
346                            }
347                            match this.contacts.binary_search_by_key(
348                                &&updated_contact.user.github_login,
349                                |contact| &contact.user.github_login,
350                            ) {
351                                Ok(ix) => this.contacts[ix] = updated_contact,
352                                Err(ix) => this.contacts.insert(ix, updated_contact),
353                            }
354                        }
355
356                        // Remove incoming contact requests
357                        this.incoming_contact_requests.retain(|user| {
358                            if removed_incoming_requests.contains(&user.id) {
359                                cx.emit(Event::Contact {
360                                    user: user.clone(),
361                                    kind: ContactEventKind::Cancelled,
362                                });
363                                false
364                            } else {
365                                true
366                            }
367                        });
368                        // Update existing incoming requests and insert new ones
369                        for (user, should_notify) in incoming_requests {
370                            if should_notify {
371                                cx.emit(Event::Contact {
372                                    user: user.clone(),
373                                    kind: ContactEventKind::Requested,
374                                });
375                            }
376
377                            match this
378                                .incoming_contact_requests
379                                .binary_search_by_key(&&user.github_login, |contact| {
380                                    &contact.github_login
381                                }) {
382                                Ok(ix) => this.incoming_contact_requests[ix] = user,
383                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
384                            }
385                        }
386
387                        // Remove outgoing contact requests
388                        this.outgoing_contact_requests
389                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
390                        // Update existing incoming requests and insert new ones
391                        for request in outgoing_requests {
392                            match this
393                                .outgoing_contact_requests
394                                .binary_search_by_key(&&request.github_login, |contact| {
395                                    &contact.github_login
396                                }) {
397                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
398                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
399                            }
400                        }
401
402                        cx.notify();
403                    });
404
405                    Ok(())
406                })
407            }
408        }
409    }
410
411    pub fn contacts(&self) -> &[Arc<Contact>] {
412        &self.contacts
413    }
414
415    pub fn has_contact(&self, user: &Arc<User>) -> bool {
416        self.contacts
417            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
418            .is_ok()
419    }
420
421    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
422        &self.incoming_contact_requests
423    }
424
425    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
426        &self.outgoing_contact_requests
427    }
428
429    pub fn is_contact_request_pending(&self, user: &User) -> bool {
430        self.pending_contact_requests.contains_key(&user.id)
431    }
432
433    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
434        if self
435            .contacts
436            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
437            .is_ok()
438        {
439            ContactRequestStatus::RequestAccepted
440        } else if self
441            .outgoing_contact_requests
442            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
443            .is_ok()
444        {
445            ContactRequestStatus::RequestSent
446        } else if self
447            .incoming_contact_requests
448            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
449            .is_ok()
450        {
451            ContactRequestStatus::RequestReceived
452        } else {
453            ContactRequestStatus::None
454        }
455    }
456
457    pub fn request_contact(
458        &mut self,
459        responder_id: u64,
460        cx: &mut ModelContext<Self>,
461    ) -> Task<Result<()>> {
462        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
463    }
464
465    pub fn remove_contact(
466        &mut self,
467        user_id: u64,
468        cx: &mut ModelContext<Self>,
469    ) -> Task<Result<()>> {
470        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
471    }
472
473    pub fn respond_to_contact_request(
474        &mut self,
475        requester_id: u64,
476        accept: bool,
477        cx: &mut ModelContext<Self>,
478    ) -> Task<Result<()>> {
479        self.perform_contact_request(
480            requester_id,
481            proto::RespondToContactRequest {
482                requester_id,
483                response: if accept {
484                    proto::ContactRequestResponse::Accept
485                } else {
486                    proto::ContactRequestResponse::Decline
487                } as i32,
488            },
489            cx,
490        )
491    }
492
493    pub fn dismiss_contact_request(
494        &mut self,
495        requester_id: u64,
496        cx: &mut ModelContext<Self>,
497    ) -> Task<Result<()>> {
498        let client = self.client.upgrade();
499        cx.spawn_weak(|_, _| async move {
500            client
501                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
502                .request(proto::RespondToContactRequest {
503                    requester_id,
504                    response: proto::ContactRequestResponse::Dismiss as i32,
505                })
506                .await?;
507            Ok(())
508        })
509    }
510
511    fn perform_contact_request<T: RequestMessage>(
512        &mut self,
513        user_id: u64,
514        request: T,
515        cx: &mut ModelContext<Self>,
516    ) -> Task<Result<()>> {
517        let client = self.client.upgrade();
518        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
519        cx.notify();
520
521        cx.spawn(|this, mut cx| async move {
522            let response = client
523                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
524                .request(request)
525                .await;
526            this.update(&mut cx, |this, cx| {
527                if let Entry::Occupied(mut request_count) =
528                    this.pending_contact_requests.entry(user_id)
529                {
530                    *request_count.get_mut() -= 1;
531                    if *request_count.get() == 0 {
532                        request_count.remove();
533                    }
534                }
535                cx.notify();
536            });
537            response?;
538            Ok(())
539        })
540    }
541
542    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
543        let (tx, mut rx) = postage::barrier::channel();
544        self.update_contacts_tx
545            .unbounded_send(UpdateContacts::Clear(tx))
546            .unwrap();
547        async move {
548            rx.next().await;
549        }
550    }
551
552    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
553        let (tx, mut rx) = postage::barrier::channel();
554        self.update_contacts_tx
555            .unbounded_send(UpdateContacts::Wait(tx))
556            .unwrap();
557        async move {
558            rx.next().await;
559        }
560    }
561
562    pub fn get_users(
563        &mut self,
564        user_ids: Vec<u64>,
565        cx: &mut ModelContext<Self>,
566    ) -> Task<Result<Vec<Arc<User>>>> {
567        let mut user_ids_to_fetch = user_ids.clone();
568        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
569
570        cx.spawn(|this, mut cx| async move {
571            if !user_ids_to_fetch.is_empty() {
572                this.update(&mut cx, |this, cx| {
573                    this.load_users(
574                        proto::GetUsers {
575                            user_ids: user_ids_to_fetch,
576                        },
577                        cx,
578                    )
579                })
580                .await?;
581            }
582
583            this.read_with(&cx, |this, _| {
584                user_ids
585                    .iter()
586                    .map(|user_id| {
587                        this.users
588                            .get(user_id)
589                            .cloned()
590                            .ok_or_else(|| anyhow!("user {} not found", user_id))
591                    })
592                    .collect()
593            })
594        })
595    }
596
597    pub fn fuzzy_search_users(
598        &mut self,
599        query: String,
600        cx: &mut ModelContext<Self>,
601    ) -> Task<Result<Vec<Arc<User>>>> {
602        self.load_users(proto::FuzzySearchUsers { query }, cx)
603    }
604
605    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
606        self.users.get(&user_id).cloned()
607    }
608
609    pub fn get_user(
610        &mut self,
611        user_id: u64,
612        cx: &mut ModelContext<Self>,
613    ) -> Task<Result<Arc<User>>> {
614        if let Some(user) = self.users.get(&user_id).cloned() {
615            return Task::ready(Ok(user));
616        }
617
618        let load_users = self.get_users(vec![user_id], cx);
619        cx.spawn(|this, mut cx| async move {
620            load_users.await?;
621            this.update(&mut cx, |this, _| {
622                this.users
623                    .get(&user_id)
624                    .cloned()
625                    .ok_or_else(|| anyhow!("server responded with no users"))
626            })?
627        })
628    }
629
630    pub fn current_user(&self) -> Option<Arc<User>> {
631        self.current_user.borrow().clone()
632    }
633
634    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
635        self.current_user.clone()
636    }
637
638    fn load_users(
639        &mut self,
640        request: impl RequestMessage<Response = UsersResponse>,
641        cx: &mut ModelContext<Self>,
642    ) -> Task<Result<Vec<Arc<User>>>> {
643        let client = self.client.clone();
644        let http = self.http.clone();
645        cx.spawn(|this, mut cx| async move {
646            if let Some(rpc) = client.upgrade() {
647                let response = rpc.request(request).await.context("error loading users")?;
648                let users = future::join_all(
649                    response
650                        .users
651                        .into_iter()
652                        .map(|user| User::new(user, http.as_ref())),
653                )
654                .await;
655
656                this.update(&mut cx, |this, _| {
657                    for user in &users {
658                        this.users.insert(user.id, user.clone());
659                    }
660                })
661                .ok();
662
663                Ok(users)
664            } else {
665                Ok(Vec::new())
666            }
667        })
668    }
669
670    pub fn set_participant_indices(
671        &mut self,
672        participant_indices: HashMap<u64, ParticipantIndex>,
673        cx: &mut ModelContext<Self>,
674    ) {
675        if participant_indices != self.participant_indices {
676            self.participant_indices = participant_indices;
677            cx.emit(Event::ParticipantIndicesChanged);
678        }
679    }
680
681    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
682        &self.participant_indices
683    }
684}
685
686impl User {
687    async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
688        Arc::new(User {
689            id: message.id,
690            github_login: message.github_login,
691            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
692        })
693    }
694}
695
696impl Contact {
697    async fn from_proto(
698        contact: proto::Contact,
699        user_store: &Handle<UserStore>,
700        cx: &mut AsyncAppContext,
701    ) -> Result<Self> {
702        let user = user_store
703            .update(cx, |user_store, cx| {
704                user_store.get_user(contact.user_id, cx)
705            })
706            .await?;
707        Ok(Self {
708            user,
709            online: contact.online,
710            busy: contact.busy,
711        })
712    }
713}
714
715impl Collaborator {
716    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
717        Ok(Self {
718            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
719            replica_id: message.replica_id as ReplicaId,
720            user_id: message.user_id as UserId,
721        })
722    }
723}
724
725// todo!("we probably don't need this now that we fetch")
726async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
727    let mut response = http
728        .get(url, Default::default(), true)
729        .await
730        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
731
732    if !response.status().is_success() {
733        return Err(anyhow!("avatar request failed {:?}", response.status()));
734    }
735
736    let mut body = Vec::new();
737    response
738        .body_mut()
739        .read_to_end(&mut body)
740        .await
741        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
742    let format = image::guess_format(&body)?;
743    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
744    Ok(Arc::new(ImageData::new(image)))
745}