user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use collections::{hash_map::Entry, HashMap, HashSet};
  4use feature_flags::FeatureFlagAppExt;
  5use futures::{channel::mpsc, Future, StreamExt};
  6use gpui::{AsyncAppContext, EventEmitter, Model, ModelContext, SharedUrl, Task};
  7use postage::{sink::Sink, watch};
  8use rpc::proto::{RequestMessage, UsersResponse};
  9use std::sync::{Arc, Weak};
 10use text::ReplicaId;
 11use util::TryFutureExt as _;
 12
 13pub type UserId = u64;
 14
 15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 16pub struct ParticipantIndex(pub u32);
 17
 18#[derive(Default, Debug)]
 19pub struct User {
 20    pub id: UserId,
 21    pub github_login: String,
 22    pub avatar_uri: SharedUrl,
 23}
 24
 25#[derive(Clone, Debug, PartialEq, Eq)]
 26pub struct Collaborator {
 27    pub peer_id: proto::PeerId,
 28    pub replica_id: ReplicaId,
 29    pub user_id: UserId,
 30}
 31
 32impl PartialOrd for User {
 33    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 34        Some(self.cmp(other))
 35    }
 36}
 37
 38impl Ord for User {
 39    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 40        self.github_login.cmp(&other.github_login)
 41    }
 42}
 43
 44impl PartialEq for User {
 45    fn eq(&self, other: &Self) -> bool {
 46        self.id == other.id && self.github_login == other.github_login
 47    }
 48}
 49
 50impl Eq for User {}
 51
 52#[derive(Debug, PartialEq)]
 53pub struct Contact {
 54    pub user: Arc<User>,
 55    pub online: bool,
 56    pub busy: bool,
 57}
 58
 59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 60pub enum ContactRequestStatus {
 61    None,
 62    RequestSent,
 63    RequestReceived,
 64    RequestAccepted,
 65}
 66
 67pub struct UserStore {
 68    users: HashMap<u64, Arc<User>>,
 69    participant_indices: HashMap<u64, ParticipantIndex>,
 70    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 71    current_user: watch::Receiver<Option<Arc<User>>>,
 72    contacts: Vec<Arc<Contact>>,
 73    incoming_contact_requests: Vec<Arc<User>>,
 74    outgoing_contact_requests: Vec<Arc<User>>,
 75    pending_contact_requests: HashMap<u64, usize>,
 76    invite_info: Option<InviteInfo>,
 77    client: Weak<Client>,
 78    _maintain_contacts: Task<()>,
 79    _maintain_current_user: Task<Result<()>>,
 80}
 81
 82#[derive(Clone)]
 83pub struct InviteInfo {
 84    pub count: u32,
 85    pub url: Arc<str>,
 86}
 87
 88pub enum Event {
 89    Contact {
 90        user: Arc<User>,
 91        kind: ContactEventKind,
 92    },
 93    ShowContacts,
 94    ParticipantIndicesChanged,
 95}
 96
 97#[derive(Clone, Copy)]
 98pub enum ContactEventKind {
 99    Requested,
100    Accepted,
101    Cancelled,
102}
103
104impl EventEmitter<Event> for UserStore {}
105
106enum UpdateContacts {
107    Update(proto::UpdateContacts),
108    Wait(postage::barrier::Sender),
109    Clear(postage::barrier::Sender),
110}
111
112impl UserStore {
113    pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
114        let (mut current_user_tx, current_user_rx) = watch::channel();
115        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
116        let rpc_subscriptions = vec![
117            client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
118            client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
119            client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
120        ];
121        Self {
122            users: Default::default(),
123            current_user: current_user_rx,
124            contacts: Default::default(),
125            incoming_contact_requests: Default::default(),
126            participant_indices: Default::default(),
127            outgoing_contact_requests: Default::default(),
128            invite_info: None,
129            client: Arc::downgrade(&client),
130            update_contacts_tx,
131            _maintain_contacts: cx.spawn(|this, mut cx| async move {
132                let _subscriptions = rpc_subscriptions;
133                while let Some(message) = update_contacts_rx.next().await {
134                    if let Ok(task) =
135                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
136                    {
137                        task.log_err().await;
138                    } else {
139                        break;
140                    }
141                }
142            }),
143            _maintain_current_user: cx.spawn(|this, mut cx| async move {
144                let mut status = client.status();
145                while let Some(status) = status.next().await {
146                    match status {
147                        Status::Connected { .. } => {
148                            if let Some(user_id) = client.user_id() {
149                                let fetch_user = if let Ok(fetch_user) = this
150                                    .update(&mut cx, |this, cx| {
151                                        this.get_user(user_id, cx).log_err()
152                                    }) {
153                                    fetch_user
154                                } else {
155                                    break;
156                                };
157                                let fetch_metrics_id =
158                                    client.request(proto::GetPrivateUserInfo {}).log_err();
159                                let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
160
161                                cx.update(|cx| {
162                                    if let Some(info) = info {
163                                        cx.update_flags(info.staff, info.flags);
164                                        client.telemetry.set_authenticated_user_info(
165                                            Some(info.metrics_id.clone()),
166                                            info.staff,
167                                        )
168                                    }
169                                })?;
170
171                                current_user_tx.send(user).await.ok();
172
173                                this.update(&mut cx, |_, cx| cx.notify())?;
174                            }
175                        }
176                        Status::SignedOut => {
177                            current_user_tx.send(None).await.ok();
178                            this.update(&mut cx, |this, cx| {
179                                cx.notify();
180                                this.clear_contacts()
181                            })?
182                            .await;
183                        }
184                        Status::ConnectionLost => {
185                            this.update(&mut cx, |this, cx| {
186                                cx.notify();
187                                this.clear_contacts()
188                            })?
189                            .await;
190                        }
191                        _ => {}
192                    }
193                }
194                Ok(())
195            }),
196            pending_contact_requests: Default::default(),
197        }
198    }
199
200    #[cfg(feature = "test-support")]
201    pub fn clear_cache(&mut self) {
202        self.users.clear();
203    }
204
205    async fn handle_update_invite_info(
206        this: Model<Self>,
207        message: TypedEnvelope<proto::UpdateInviteInfo>,
208        _: Arc<Client>,
209        mut cx: AsyncAppContext,
210    ) -> Result<()> {
211        this.update(&mut cx, |this, cx| {
212            this.invite_info = Some(InviteInfo {
213                url: Arc::from(message.payload.url),
214                count: message.payload.count,
215            });
216            cx.notify();
217        })?;
218        Ok(())
219    }
220
221    async fn handle_show_contacts(
222        this: Model<Self>,
223        _: TypedEnvelope<proto::ShowContacts>,
224        _: Arc<Client>,
225        mut cx: AsyncAppContext,
226    ) -> Result<()> {
227        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
228        Ok(())
229    }
230
231    pub fn invite_info(&self) -> Option<&InviteInfo> {
232        self.invite_info.as_ref()
233    }
234
235    async fn handle_update_contacts(
236        this: Model<Self>,
237        message: TypedEnvelope<proto::UpdateContacts>,
238        _: Arc<Client>,
239        mut cx: AsyncAppContext,
240    ) -> Result<()> {
241        this.update(&mut cx, |this, _| {
242            this.update_contacts_tx
243                .unbounded_send(UpdateContacts::Update(message.payload))
244                .unwrap();
245        })?;
246        Ok(())
247    }
248
249    fn update_contacts(
250        &mut self,
251        message: UpdateContacts,
252        cx: &mut ModelContext<Self>,
253    ) -> Task<Result<()>> {
254        match message {
255            UpdateContacts::Wait(barrier) => {
256                drop(barrier);
257                Task::ready(Ok(()))
258            }
259            UpdateContacts::Clear(barrier) => {
260                self.contacts.clear();
261                self.incoming_contact_requests.clear();
262                self.outgoing_contact_requests.clear();
263                drop(barrier);
264                Task::ready(Ok(()))
265            }
266            UpdateContacts::Update(message) => {
267                let mut user_ids = HashSet::default();
268                for contact in &message.contacts {
269                    user_ids.insert(contact.user_id);
270                }
271                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
272                user_ids.extend(message.outgoing_requests.iter());
273
274                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
275                cx.spawn(|this, mut cx| async move {
276                    load_users.await?;
277
278                    // Users are fetched in parallel above and cached in call to get_users
279                    // No need to paralellize here
280                    let mut updated_contacts = Vec::new();
281                    let this = this
282                        .upgrade()
283                        .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
284                    for contact in message.contacts {
285                        updated_contacts.push(Arc::new(
286                            Contact::from_proto(contact, &this, &mut cx).await?,
287                        ));
288                    }
289
290                    let mut incoming_requests = Vec::new();
291                    for request in message.incoming_requests {
292                        incoming_requests.push({
293                            this.update(&mut cx, |this, cx| {
294                                this.get_user(request.requester_id, cx)
295                            })?
296                            .await?
297                        });
298                    }
299
300                    let mut outgoing_requests = Vec::new();
301                    for requested_user_id in message.outgoing_requests {
302                        outgoing_requests.push(
303                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
304                                .await?,
305                        );
306                    }
307
308                    let removed_contacts =
309                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
310                    let removed_incoming_requests =
311                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
312                    let removed_outgoing_requests =
313                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
314
315                    this.update(&mut cx, |this, cx| {
316                        // Remove contacts
317                        this.contacts
318                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
319                        // Update existing contacts and insert new ones
320                        for updated_contact in updated_contacts {
321                            match this.contacts.binary_search_by_key(
322                                &&updated_contact.user.github_login,
323                                |contact| &contact.user.github_login,
324                            ) {
325                                Ok(ix) => this.contacts[ix] = updated_contact,
326                                Err(ix) => this.contacts.insert(ix, updated_contact),
327                            }
328                        }
329
330                        // Remove incoming contact requests
331                        this.incoming_contact_requests.retain(|user| {
332                            if removed_incoming_requests.contains(&user.id) {
333                                cx.emit(Event::Contact {
334                                    user: user.clone(),
335                                    kind: ContactEventKind::Cancelled,
336                                });
337                                false
338                            } else {
339                                true
340                            }
341                        });
342                        // Update existing incoming requests and insert new ones
343                        for user in incoming_requests {
344                            match this
345                                .incoming_contact_requests
346                                .binary_search_by_key(&&user.github_login, |contact| {
347                                    &contact.github_login
348                                }) {
349                                Ok(ix) => this.incoming_contact_requests[ix] = user,
350                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
351                            }
352                        }
353
354                        // Remove outgoing contact requests
355                        this.outgoing_contact_requests
356                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
357                        // Update existing incoming requests and insert new ones
358                        for request in outgoing_requests {
359                            match this
360                                .outgoing_contact_requests
361                                .binary_search_by_key(&&request.github_login, |contact| {
362                                    &contact.github_login
363                                }) {
364                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
365                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
366                            }
367                        }
368
369                        cx.notify();
370                    })?;
371
372                    Ok(())
373                })
374            }
375        }
376    }
377
378    pub fn contacts(&self) -> &[Arc<Contact>] {
379        &self.contacts
380    }
381
382    pub fn has_contact(&self, user: &Arc<User>) -> bool {
383        self.contacts
384            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
385            .is_ok()
386    }
387
388    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
389        &self.incoming_contact_requests
390    }
391
392    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
393        &self.outgoing_contact_requests
394    }
395
396    pub fn is_contact_request_pending(&self, user: &User) -> bool {
397        self.pending_contact_requests.contains_key(&user.id)
398    }
399
400    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
401        if self
402            .contacts
403            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
404            .is_ok()
405        {
406            ContactRequestStatus::RequestAccepted
407        } else if self
408            .outgoing_contact_requests
409            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
410            .is_ok()
411        {
412            ContactRequestStatus::RequestSent
413        } else if self
414            .incoming_contact_requests
415            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
416            .is_ok()
417        {
418            ContactRequestStatus::RequestReceived
419        } else {
420            ContactRequestStatus::None
421        }
422    }
423
424    pub fn request_contact(
425        &mut self,
426        responder_id: u64,
427        cx: &mut ModelContext<Self>,
428    ) -> Task<Result<()>> {
429        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
430    }
431
432    pub fn remove_contact(
433        &mut self,
434        user_id: u64,
435        cx: &mut ModelContext<Self>,
436    ) -> Task<Result<()>> {
437        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
438    }
439
440    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
441        self.incoming_contact_requests
442            .iter()
443            .any(|user| user.id == user_id)
444    }
445
446    pub fn respond_to_contact_request(
447        &mut self,
448        requester_id: u64,
449        accept: bool,
450        cx: &mut ModelContext<Self>,
451    ) -> Task<Result<()>> {
452        self.perform_contact_request(
453            requester_id,
454            proto::RespondToContactRequest {
455                requester_id,
456                response: if accept {
457                    proto::ContactRequestResponse::Accept
458                } else {
459                    proto::ContactRequestResponse::Decline
460                } as i32,
461            },
462            cx,
463        )
464    }
465
466    pub fn dismiss_contact_request(
467        &mut self,
468        requester_id: u64,
469        cx: &mut ModelContext<Self>,
470    ) -> Task<Result<()>> {
471        let client = self.client.upgrade();
472        cx.spawn(move |_, _| async move {
473            client
474                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
475                .request(proto::RespondToContactRequest {
476                    requester_id,
477                    response: proto::ContactRequestResponse::Dismiss as i32,
478                })
479                .await?;
480            Ok(())
481        })
482    }
483
484    fn perform_contact_request<T: RequestMessage>(
485        &mut self,
486        user_id: u64,
487        request: T,
488        cx: &mut ModelContext<Self>,
489    ) -> Task<Result<()>> {
490        let client = self.client.upgrade();
491        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
492        cx.notify();
493
494        cx.spawn(move |this, mut cx| async move {
495            let response = client
496                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
497                .request(request)
498                .await;
499            this.update(&mut cx, |this, cx| {
500                if let Entry::Occupied(mut request_count) =
501                    this.pending_contact_requests.entry(user_id)
502                {
503                    *request_count.get_mut() -= 1;
504                    if *request_count.get() == 0 {
505                        request_count.remove();
506                    }
507                }
508                cx.notify();
509            })?;
510            response?;
511            Ok(())
512        })
513    }
514
515    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
516        let (tx, mut rx) = postage::barrier::channel();
517        self.update_contacts_tx
518            .unbounded_send(UpdateContacts::Clear(tx))
519            .unwrap();
520        async move {
521            rx.next().await;
522        }
523    }
524
525    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
526        let (tx, mut rx) = postage::barrier::channel();
527        self.update_contacts_tx
528            .unbounded_send(UpdateContacts::Wait(tx))
529            .unwrap();
530        async move {
531            rx.next().await;
532        }
533    }
534
535    pub fn get_users(
536        &mut self,
537        user_ids: Vec<u64>,
538        cx: &mut ModelContext<Self>,
539    ) -> Task<Result<Vec<Arc<User>>>> {
540        let mut user_ids_to_fetch = user_ids.clone();
541        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
542
543        cx.spawn(|this, mut cx| async move {
544            if !user_ids_to_fetch.is_empty() {
545                this.update(&mut cx, |this, cx| {
546                    this.load_users(
547                        proto::GetUsers {
548                            user_ids: user_ids_to_fetch,
549                        },
550                        cx,
551                    )
552                })?
553                .await?;
554            }
555
556            this.update(&mut cx, |this, _| {
557                user_ids
558                    .iter()
559                    .map(|user_id| {
560                        this.users
561                            .get(user_id)
562                            .cloned()
563                            .ok_or_else(|| anyhow!("user {} not found", user_id))
564                    })
565                    .collect()
566            })?
567        })
568    }
569
570    pub fn fuzzy_search_users(
571        &mut self,
572        query: String,
573        cx: &mut ModelContext<Self>,
574    ) -> Task<Result<Vec<Arc<User>>>> {
575        self.load_users(proto::FuzzySearchUsers { query }, cx)
576    }
577
578    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
579        self.users.get(&user_id).cloned()
580    }
581
582    pub fn get_user(
583        &mut self,
584        user_id: u64,
585        cx: &mut ModelContext<Self>,
586    ) -> Task<Result<Arc<User>>> {
587        if let Some(user) = self.users.get(&user_id).cloned() {
588            return Task::ready(Ok(user));
589        }
590
591        let load_users = self.get_users(vec![user_id], cx);
592        cx.spawn(move |this, mut cx| async move {
593            load_users.await?;
594            this.update(&mut cx, |this, _| {
595                this.users
596                    .get(&user_id)
597                    .cloned()
598                    .ok_or_else(|| anyhow!("server responded with no users"))
599            })?
600        })
601    }
602
603    pub fn current_user(&self) -> Option<Arc<User>> {
604        self.current_user.borrow().clone()
605    }
606
607    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
608        self.current_user.clone()
609    }
610
611    fn load_users(
612        &mut self,
613        request: impl RequestMessage<Response = UsersResponse>,
614        cx: &mut ModelContext<Self>,
615    ) -> Task<Result<Vec<Arc<User>>>> {
616        let client = self.client.clone();
617        cx.spawn(|this, mut cx| async move {
618            if let Some(rpc) = client.upgrade() {
619                let response = rpc.request(request).await.context("error loading users")?;
620                let users = response
621                    .users
622                    .into_iter()
623                    .map(|user| User::new(user))
624                    .collect::<Vec<_>>();
625
626                this.update(&mut cx, |this, _| {
627                    for user in &users {
628                        this.users.insert(user.id, user.clone());
629                    }
630                })
631                .ok();
632
633                Ok(users)
634            } else {
635                Ok(Vec::new())
636            }
637        })
638    }
639
640    pub fn set_participant_indices(
641        &mut self,
642        participant_indices: HashMap<u64, ParticipantIndex>,
643        cx: &mut ModelContext<Self>,
644    ) {
645        if participant_indices != self.participant_indices {
646            self.participant_indices = participant_indices;
647            cx.emit(Event::ParticipantIndicesChanged);
648        }
649    }
650
651    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
652        &self.participant_indices
653    }
654}
655
656impl User {
657    fn new(message: proto::User) -> Arc<Self> {
658        Arc::new(User {
659            id: message.id,
660            github_login: message.github_login,
661            avatar_uri: message.avatar_url.into(),
662        })
663    }
664}
665
666impl Contact {
667    async fn from_proto(
668        contact: proto::Contact,
669        user_store: &Model<UserStore>,
670        cx: &mut AsyncAppContext,
671    ) -> Result<Self> {
672        let user = user_store
673            .update(cx, |user_store, cx| {
674                user_store.get_user(contact.user_id, cx)
675            })?
676            .await?;
677        Ok(Self {
678            user,
679            online: contact.online,
680            busy: contact.busy,
681        })
682    }
683}
684
685impl Collaborator {
686    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
687        Ok(Self {
688            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
689            replica_id: message.replica_id as ReplicaId,
690            user_id: message.user_id as UserId,
691        })
692    }
693}