user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use collections::{hash_map::Entry, HashMap, HashSet};
  4use feature_flags::FeatureFlagAppExt;
  5use futures::{channel::mpsc, Future, StreamExt};
  6use gpui::{
  7    AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
  8    WeakModel,
  9};
 10use postage::{sink::Sink, watch};
 11use rpc::proto::{RequestMessage, UsersResponse};
 12use std::sync::{Arc, Weak};
 13use text::ReplicaId;
 14use util::TryFutureExt as _;
 15
 16pub type UserId = u64;
 17
 18#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 19pub struct ChannelId(pub u64);
 20
 21impl std::fmt::Display for ChannelId {
 22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 23        self.0.fmt(f)
 24    }
 25}
 26
 27#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 28pub struct HostedProjectId(pub u64);
 29
 30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 31pub struct ParticipantIndex(pub u32);
 32
 33#[derive(Default, Debug)]
 34pub struct User {
 35    pub id: UserId,
 36    pub github_login: String,
 37    pub avatar_uri: SharedUri,
 38}
 39
 40#[derive(Clone, Debug, PartialEq, Eq)]
 41pub struct Collaborator {
 42    pub peer_id: proto::PeerId,
 43    pub replica_id: ReplicaId,
 44    pub user_id: UserId,
 45}
 46
 47impl PartialOrd for User {
 48    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 49        Some(self.cmp(other))
 50    }
 51}
 52
 53impl Ord for User {
 54    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 55        self.github_login.cmp(&other.github_login)
 56    }
 57}
 58
 59impl PartialEq for User {
 60    fn eq(&self, other: &Self) -> bool {
 61        self.id == other.id && self.github_login == other.github_login
 62    }
 63}
 64
 65impl Eq for User {}
 66
 67#[derive(Debug, PartialEq)]
 68pub struct Contact {
 69    pub user: Arc<User>,
 70    pub online: bool,
 71    pub busy: bool,
 72}
 73
 74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 75pub enum ContactRequestStatus {
 76    None,
 77    RequestSent,
 78    RequestReceived,
 79    RequestAccepted,
 80}
 81
 82pub struct UserStore {
 83    users: HashMap<u64, Arc<User>>,
 84    participant_indices: HashMap<u64, ParticipantIndex>,
 85    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 86    current_user: watch::Receiver<Option<Arc<User>>>,
 87    contacts: Vec<Arc<Contact>>,
 88    incoming_contact_requests: Vec<Arc<User>>,
 89    outgoing_contact_requests: Vec<Arc<User>>,
 90    pending_contact_requests: HashMap<u64, usize>,
 91    invite_info: Option<InviteInfo>,
 92    client: Weak<Client>,
 93    _maintain_contacts: Task<()>,
 94    _maintain_current_user: Task<Result<()>>,
 95    weak_self: WeakModel<Self>,
 96}
 97
 98#[derive(Clone)]
 99pub struct InviteInfo {
100    pub count: u32,
101    pub url: Arc<str>,
102}
103
104pub enum Event {
105    Contact {
106        user: Arc<User>,
107        kind: ContactEventKind,
108    },
109    ShowContacts,
110    ParticipantIndicesChanged,
111}
112
113#[derive(Clone, Copy)]
114pub enum ContactEventKind {
115    Requested,
116    Accepted,
117    Cancelled,
118}
119
120impl EventEmitter<Event> for UserStore {}
121
122enum UpdateContacts {
123    Update(proto::UpdateContacts),
124    Wait(postage::barrier::Sender),
125    Clear(postage::barrier::Sender),
126}
127
128impl UserStore {
129    pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
130        let (mut current_user_tx, current_user_rx) = watch::channel();
131        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
132        let rpc_subscriptions = vec![
133            client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
134            client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
135            client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
136        ];
137        Self {
138            users: Default::default(),
139            current_user: current_user_rx,
140            contacts: Default::default(),
141            incoming_contact_requests: Default::default(),
142            participant_indices: Default::default(),
143            outgoing_contact_requests: Default::default(),
144            invite_info: None,
145            client: Arc::downgrade(&client),
146            update_contacts_tx,
147            _maintain_contacts: cx.spawn(|this, mut cx| async move {
148                let _subscriptions = rpc_subscriptions;
149                while let Some(message) = update_contacts_rx.next().await {
150                    if let Ok(task) =
151                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
152                    {
153                        task.log_err().await;
154                    } else {
155                        break;
156                    }
157                }
158            }),
159            _maintain_current_user: cx.spawn(|this, mut cx| async move {
160                let mut status = client.status();
161                let weak = Arc::downgrade(&client);
162                drop(client);
163                while let Some(status) = status.next().await {
164                    // if the client is dropped, the app is shutting down.
165                    let Some(client) = weak.upgrade() else {
166                        return Ok(());
167                    };
168                    match status {
169                        Status::Connected { .. } => {
170                            if let Some(user_id) = client.user_id() {
171                                let fetch_user = if let Ok(fetch_user) = this
172                                    .update(&mut cx, |this, cx| {
173                                        this.get_user(user_id, cx).log_err()
174                                    }) {
175                                    fetch_user
176                                } else {
177                                    break;
178                                };
179                                let fetch_metrics_id =
180                                    client.request(proto::GetPrivateUserInfo {}).log_err();
181                                let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
182
183                                cx.update(|cx| {
184                                    if let Some(info) = info {
185                                        cx.update_flags(info.staff, info.flags);
186                                        client.telemetry.set_authenticated_user_info(
187                                            Some(info.metrics_id.clone()),
188                                            info.staff,
189                                        )
190                                    }
191                                })?;
192
193                                current_user_tx.send(user).await.ok();
194
195                                this.update(&mut cx, |_, cx| cx.notify())?;
196                            }
197                        }
198                        Status::SignedOut => {
199                            current_user_tx.send(None).await.ok();
200                            this.update(&mut cx, |this, cx| {
201                                cx.notify();
202                                this.clear_contacts()
203                            })?
204                            .await;
205                        }
206                        Status::ConnectionLost => {
207                            this.update(&mut cx, |this, cx| {
208                                cx.notify();
209                                this.clear_contacts()
210                            })?
211                            .await;
212                        }
213                        _ => {}
214                    }
215                }
216                Ok(())
217            }),
218            pending_contact_requests: Default::default(),
219            weak_self: cx.weak_model(),
220        }
221    }
222
223    #[cfg(feature = "test-support")]
224    pub fn clear_cache(&mut self) {
225        self.users.clear();
226    }
227
228    async fn handle_update_invite_info(
229        this: Model<Self>,
230        message: TypedEnvelope<proto::UpdateInviteInfo>,
231        _: Arc<Client>,
232        mut cx: AsyncAppContext,
233    ) -> Result<()> {
234        this.update(&mut cx, |this, cx| {
235            this.invite_info = Some(InviteInfo {
236                url: Arc::from(message.payload.url),
237                count: message.payload.count,
238            });
239            cx.notify();
240        })?;
241        Ok(())
242    }
243
244    async fn handle_show_contacts(
245        this: Model<Self>,
246        _: TypedEnvelope<proto::ShowContacts>,
247        _: Arc<Client>,
248        mut cx: AsyncAppContext,
249    ) -> Result<()> {
250        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
251        Ok(())
252    }
253
254    pub fn invite_info(&self) -> Option<&InviteInfo> {
255        self.invite_info.as_ref()
256    }
257
258    async fn handle_update_contacts(
259        this: Model<Self>,
260        message: TypedEnvelope<proto::UpdateContacts>,
261        _: Arc<Client>,
262        mut cx: AsyncAppContext,
263    ) -> Result<()> {
264        this.update(&mut cx, |this, _| {
265            this.update_contacts_tx
266                .unbounded_send(UpdateContacts::Update(message.payload))
267                .unwrap();
268        })?;
269        Ok(())
270    }
271
272    fn update_contacts(
273        &mut self,
274        message: UpdateContacts,
275        cx: &mut ModelContext<Self>,
276    ) -> Task<Result<()>> {
277        match message {
278            UpdateContacts::Wait(barrier) => {
279                drop(barrier);
280                Task::ready(Ok(()))
281            }
282            UpdateContacts::Clear(barrier) => {
283                self.contacts.clear();
284                self.incoming_contact_requests.clear();
285                self.outgoing_contact_requests.clear();
286                drop(barrier);
287                Task::ready(Ok(()))
288            }
289            UpdateContacts::Update(message) => {
290                let mut user_ids = HashSet::default();
291                for contact in &message.contacts {
292                    user_ids.insert(contact.user_id);
293                }
294                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
295                user_ids.extend(message.outgoing_requests.iter());
296
297                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
298                cx.spawn(|this, mut cx| async move {
299                    load_users.await?;
300
301                    // Users are fetched in parallel above and cached in call to get_users
302                    // No need to parallelize here
303                    let mut updated_contacts = Vec::new();
304                    let this = this
305                        .upgrade()
306                        .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
307                    for contact in message.contacts {
308                        updated_contacts.push(Arc::new(
309                            Contact::from_proto(contact, &this, &mut cx).await?,
310                        ));
311                    }
312
313                    let mut incoming_requests = Vec::new();
314                    for request in message.incoming_requests {
315                        incoming_requests.push({
316                            this.update(&mut cx, |this, cx| {
317                                this.get_user(request.requester_id, cx)
318                            })?
319                            .await?
320                        });
321                    }
322
323                    let mut outgoing_requests = Vec::new();
324                    for requested_user_id in message.outgoing_requests {
325                        outgoing_requests.push(
326                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
327                                .await?,
328                        );
329                    }
330
331                    let removed_contacts =
332                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
333                    let removed_incoming_requests =
334                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
335                    let removed_outgoing_requests =
336                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
337
338                    this.update(&mut cx, |this, cx| {
339                        // Remove contacts
340                        this.contacts
341                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
342                        // Update existing contacts and insert new ones
343                        for updated_contact in updated_contacts {
344                            match this.contacts.binary_search_by_key(
345                                &&updated_contact.user.github_login,
346                                |contact| &contact.user.github_login,
347                            ) {
348                                Ok(ix) => this.contacts[ix] = updated_contact,
349                                Err(ix) => this.contacts.insert(ix, updated_contact),
350                            }
351                        }
352
353                        // Remove incoming contact requests
354                        this.incoming_contact_requests.retain(|user| {
355                            if removed_incoming_requests.contains(&user.id) {
356                                cx.emit(Event::Contact {
357                                    user: user.clone(),
358                                    kind: ContactEventKind::Cancelled,
359                                });
360                                false
361                            } else {
362                                true
363                            }
364                        });
365                        // Update existing incoming requests and insert new ones
366                        for user in incoming_requests {
367                            match this
368                                .incoming_contact_requests
369                                .binary_search_by_key(&&user.github_login, |contact| {
370                                    &contact.github_login
371                                }) {
372                                Ok(ix) => this.incoming_contact_requests[ix] = user,
373                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
374                            }
375                        }
376
377                        // Remove outgoing contact requests
378                        this.outgoing_contact_requests
379                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
380                        // Update existing incoming requests and insert new ones
381                        for request in outgoing_requests {
382                            match this
383                                .outgoing_contact_requests
384                                .binary_search_by_key(&&request.github_login, |contact| {
385                                    &contact.github_login
386                                }) {
387                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
388                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
389                            }
390                        }
391
392                        cx.notify();
393                    })?;
394
395                    Ok(())
396                })
397            }
398        }
399    }
400
401    pub fn contacts(&self) -> &[Arc<Contact>] {
402        &self.contacts
403    }
404
405    pub fn has_contact(&self, user: &Arc<User>) -> bool {
406        self.contacts
407            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
408            .is_ok()
409    }
410
411    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
412        &self.incoming_contact_requests
413    }
414
415    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
416        &self.outgoing_contact_requests
417    }
418
419    pub fn is_contact_request_pending(&self, user: &User) -> bool {
420        self.pending_contact_requests.contains_key(&user.id)
421    }
422
423    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
424        if self
425            .contacts
426            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
427            .is_ok()
428        {
429            ContactRequestStatus::RequestAccepted
430        } else if self
431            .outgoing_contact_requests
432            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
433            .is_ok()
434        {
435            ContactRequestStatus::RequestSent
436        } else if self
437            .incoming_contact_requests
438            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
439            .is_ok()
440        {
441            ContactRequestStatus::RequestReceived
442        } else {
443            ContactRequestStatus::None
444        }
445    }
446
447    pub fn request_contact(
448        &mut self,
449        responder_id: u64,
450        cx: &mut ModelContext<Self>,
451    ) -> Task<Result<()>> {
452        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
453    }
454
455    pub fn remove_contact(
456        &mut self,
457        user_id: u64,
458        cx: &mut ModelContext<Self>,
459    ) -> Task<Result<()>> {
460        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
461    }
462
463    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
464        self.incoming_contact_requests
465            .iter()
466            .any(|user| user.id == user_id)
467    }
468
469    pub fn respond_to_contact_request(
470        &mut self,
471        requester_id: u64,
472        accept: bool,
473        cx: &mut ModelContext<Self>,
474    ) -> Task<Result<()>> {
475        self.perform_contact_request(
476            requester_id,
477            proto::RespondToContactRequest {
478                requester_id,
479                response: if accept {
480                    proto::ContactRequestResponse::Accept
481                } else {
482                    proto::ContactRequestResponse::Decline
483                } as i32,
484            },
485            cx,
486        )
487    }
488
489    pub fn dismiss_contact_request(
490        &mut self,
491        requester_id: u64,
492        cx: &mut ModelContext<Self>,
493    ) -> Task<Result<()>> {
494        let client = self.client.upgrade();
495        cx.spawn(move |_, _| async move {
496            client
497                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
498                .request(proto::RespondToContactRequest {
499                    requester_id,
500                    response: proto::ContactRequestResponse::Dismiss as i32,
501                })
502                .await?;
503            Ok(())
504        })
505    }
506
507    fn perform_contact_request<T: RequestMessage>(
508        &mut self,
509        user_id: u64,
510        request: T,
511        cx: &mut ModelContext<Self>,
512    ) -> Task<Result<()>> {
513        let client = self.client.upgrade();
514        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
515        cx.notify();
516
517        cx.spawn(move |this, mut cx| async move {
518            let response = client
519                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
520                .request(request)
521                .await;
522            this.update(&mut cx, |this, cx| {
523                if let Entry::Occupied(mut request_count) =
524                    this.pending_contact_requests.entry(user_id)
525                {
526                    *request_count.get_mut() -= 1;
527                    if *request_count.get() == 0 {
528                        request_count.remove();
529                    }
530                }
531                cx.notify();
532            })?;
533            response?;
534            Ok(())
535        })
536    }
537
538    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
539        let (tx, mut rx) = postage::barrier::channel();
540        self.update_contacts_tx
541            .unbounded_send(UpdateContacts::Clear(tx))
542            .unwrap();
543        async move {
544            rx.next().await;
545        }
546    }
547
548    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
549        let (tx, mut rx) = postage::barrier::channel();
550        self.update_contacts_tx
551            .unbounded_send(UpdateContacts::Wait(tx))
552            .unwrap();
553        async move {
554            rx.next().await;
555        }
556    }
557
558    pub fn get_users(
559        &mut self,
560        user_ids: Vec<u64>,
561        cx: &mut ModelContext<Self>,
562    ) -> Task<Result<Vec<Arc<User>>>> {
563        let mut user_ids_to_fetch = user_ids.clone();
564        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
565
566        cx.spawn(|this, mut cx| async move {
567            if !user_ids_to_fetch.is_empty() {
568                this.update(&mut cx, |this, cx| {
569                    this.load_users(
570                        proto::GetUsers {
571                            user_ids: user_ids_to_fetch,
572                        },
573                        cx,
574                    )
575                })?
576                .await?;
577            }
578
579            this.update(&mut cx, |this, _| {
580                user_ids
581                    .iter()
582                    .map(|user_id| {
583                        this.users
584                            .get(user_id)
585                            .cloned()
586                            .ok_or_else(|| anyhow!("user {} not found", user_id))
587                    })
588                    .collect()
589            })?
590        })
591    }
592
593    pub fn fuzzy_search_users(
594        &mut self,
595        query: String,
596        cx: &mut ModelContext<Self>,
597    ) -> Task<Result<Vec<Arc<User>>>> {
598        self.load_users(proto::FuzzySearchUsers { query }, cx)
599    }
600
601    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
602        self.users.get(&user_id).cloned()
603    }
604
605    pub fn get_user_optimistic(
606        &mut self,
607        user_id: u64,
608        cx: &mut ModelContext<Self>,
609    ) -> Option<Arc<User>> {
610        if let Some(user) = self.users.get(&user_id).cloned() {
611            return Some(user);
612        }
613
614        self.get_user(user_id, cx).detach_and_log_err(cx);
615        None
616    }
617
618    pub fn get_user(
619        &mut self,
620        user_id: u64,
621        cx: &mut ModelContext<Self>,
622    ) -> Task<Result<Arc<User>>> {
623        if let Some(user) = self.users.get(&user_id).cloned() {
624            return Task::ready(Ok(user));
625        }
626
627        let load_users = self.get_users(vec![user_id], cx);
628        cx.spawn(move |this, mut cx| async move {
629            load_users.await?;
630            this.update(&mut cx, |this, _| {
631                this.users
632                    .get(&user_id)
633                    .cloned()
634                    .ok_or_else(|| anyhow!("server responded with no users"))
635            })?
636        })
637    }
638
639    pub fn current_user(&self) -> Option<Arc<User>> {
640        self.current_user.borrow().clone()
641    }
642
643    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
644        self.current_user.clone()
645    }
646
647    fn load_users(
648        &mut self,
649        request: impl RequestMessage<Response = UsersResponse>,
650        cx: &mut ModelContext<Self>,
651    ) -> Task<Result<Vec<Arc<User>>>> {
652        let client = self.client.clone();
653        cx.spawn(|this, mut cx| async move {
654            if let Some(rpc) = client.upgrade() {
655                let response = rpc.request(request).await.context("error loading users")?;
656                let users = response
657                    .users
658                    .into_iter()
659                    .map(User::new)
660                    .collect::<Vec<_>>();
661
662                this.update(&mut cx, |this, _| {
663                    for user in &users {
664                        this.users.insert(user.id, user.clone());
665                    }
666                })
667                .ok();
668
669                Ok(users)
670            } else {
671                Ok(Vec::new())
672            }
673        })
674    }
675
676    pub fn set_participant_indices(
677        &mut self,
678        participant_indices: HashMap<u64, ParticipantIndex>,
679        cx: &mut ModelContext<Self>,
680    ) {
681        if participant_indices != self.participant_indices {
682            self.participant_indices = participant_indices;
683            cx.emit(Event::ParticipantIndicesChanged);
684        }
685    }
686
687    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
688        &self.participant_indices
689    }
690
691    pub fn participant_names(
692        &self,
693        user_ids: impl Iterator<Item = u64>,
694        cx: &AppContext,
695    ) -> HashMap<u64, SharedString> {
696        let mut ret = HashMap::default();
697        let mut missing_user_ids = Vec::new();
698        for id in user_ids {
699            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
700                ret.insert(id, github_login.into());
701            } else {
702                missing_user_ids.push(id)
703            }
704        }
705        if !missing_user_ids.is_empty() {
706            let this = self.weak_self.clone();
707            cx.spawn(|mut cx| async move {
708                this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
709                    .await
710            })
711            .detach_and_log_err(cx);
712        }
713        ret
714    }
715}
716
717impl User {
718    fn new(message: proto::User) -> Arc<Self> {
719        Arc::new(User {
720            id: message.id,
721            github_login: message.github_login,
722            avatar_uri: message.avatar_url.into(),
723        })
724    }
725}
726
727impl Contact {
728    async fn from_proto(
729        contact: proto::Contact,
730        user_store: &Model<UserStore>,
731        cx: &mut AsyncAppContext,
732    ) -> Result<Self> {
733        let user = user_store
734            .update(cx, |user_store, cx| {
735                user_store.get_user(contact.user_id, cx)
736            })?
737            .await?;
738        Ok(Self {
739            user,
740            online: contact.online,
741            busy: contact.busy,
742        })
743    }
744}
745
746impl Collaborator {
747    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
748        Ok(Self {
749            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
750            replica_id: message.replica_id as ReplicaId,
751            user_id: message.user_id as UserId,
752        })
753    }
754}