user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use collections::{hash_map::Entry, HashMap, HashSet};
  4use feature_flags::FeatureFlagAppExt;
  5use futures::{channel::mpsc, Future, StreamExt};
  6use gpui::{
  7    AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
  8    WeakModel,
  9};
 10use postage::{sink::Sink, watch};
 11use rpc::proto::{RequestMessage, UsersResponse};
 12use std::sync::{Arc, Weak};
 13use text::ReplicaId;
 14use util::TryFutureExt as _;
 15
 16pub type UserId = u64;
 17
 18#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 19pub struct ChannelId(pub u64);
 20
 21impl std::fmt::Display for ChannelId {
 22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 23        self.0.fmt(f)
 24    }
 25}
 26
 27#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 28pub struct ProjectId(pub u64);
 29
 30#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 31pub struct DevServerId(pub u64);
 32
 33#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 34pub struct RemoteProjectId(pub u64);
 35
 36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 37pub struct ParticipantIndex(pub u32);
 38
 39#[derive(Default, Debug)]
 40pub struct User {
 41    pub id: UserId,
 42    pub github_login: String,
 43    pub avatar_uri: SharedUri,
 44}
 45
 46#[derive(Clone, Debug, PartialEq, Eq)]
 47pub struct Collaborator {
 48    pub peer_id: proto::PeerId,
 49    pub replica_id: ReplicaId,
 50    pub user_id: UserId,
 51}
 52
 53impl PartialOrd for User {
 54    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 55        Some(self.cmp(other))
 56    }
 57}
 58
 59impl Ord for User {
 60    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 61        self.github_login.cmp(&other.github_login)
 62    }
 63}
 64
 65impl PartialEq for User {
 66    fn eq(&self, other: &Self) -> bool {
 67        self.id == other.id && self.github_login == other.github_login
 68    }
 69}
 70
 71impl Eq for User {}
 72
 73#[derive(Debug, PartialEq)]
 74pub struct Contact {
 75    pub user: Arc<User>,
 76    pub online: bool,
 77    pub busy: bool,
 78}
 79
 80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 81pub enum ContactRequestStatus {
 82    None,
 83    RequestSent,
 84    RequestReceived,
 85    RequestAccepted,
 86}
 87
 88pub struct UserStore {
 89    users: HashMap<u64, Arc<User>>,
 90    participant_indices: HashMap<u64, ParticipantIndex>,
 91    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 92    current_user: watch::Receiver<Option<Arc<User>>>,
 93    contacts: Vec<Arc<Contact>>,
 94    incoming_contact_requests: Vec<Arc<User>>,
 95    outgoing_contact_requests: Vec<Arc<User>>,
 96    pending_contact_requests: HashMap<u64, usize>,
 97    invite_info: Option<InviteInfo>,
 98    client: Weak<Client>,
 99    _maintain_contacts: Task<()>,
100    _maintain_current_user: Task<Result<()>>,
101    weak_self: WeakModel<Self>,
102}
103
104#[derive(Clone)]
105pub struct InviteInfo {
106    pub count: u32,
107    pub url: Arc<str>,
108}
109
110pub enum Event {
111    Contact {
112        user: Arc<User>,
113        kind: ContactEventKind,
114    },
115    ShowContacts,
116    ParticipantIndicesChanged,
117}
118
119#[derive(Clone, Copy)]
120pub enum ContactEventKind {
121    Requested,
122    Accepted,
123    Cancelled,
124}
125
126impl EventEmitter<Event> for UserStore {}
127
128enum UpdateContacts {
129    Update(proto::UpdateContacts),
130    Wait(postage::barrier::Sender),
131    Clear(postage::barrier::Sender),
132}
133
134impl UserStore {
135    pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
136        let (mut current_user_tx, current_user_rx) = watch::channel();
137        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
138        let rpc_subscriptions = vec![
139            client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
140            client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
141            client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
142        ];
143        Self {
144            users: Default::default(),
145            current_user: current_user_rx,
146            contacts: Default::default(),
147            incoming_contact_requests: Default::default(),
148            participant_indices: Default::default(),
149            outgoing_contact_requests: Default::default(),
150            invite_info: None,
151            client: Arc::downgrade(&client),
152            update_contacts_tx,
153            _maintain_contacts: cx.spawn(|this, mut cx| async move {
154                let _subscriptions = rpc_subscriptions;
155                while let Some(message) = update_contacts_rx.next().await {
156                    if let Ok(task) =
157                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
158                    {
159                        task.log_err().await;
160                    } else {
161                        break;
162                    }
163                }
164            }),
165            _maintain_current_user: cx.spawn(|this, mut cx| async move {
166                let mut status = client.status();
167                let weak = Arc::downgrade(&client);
168                drop(client);
169                while let Some(status) = status.next().await {
170                    // if the client is dropped, the app is shutting down.
171                    let Some(client) = weak.upgrade() else {
172                        return Ok(());
173                    };
174                    match status {
175                        Status::Connected { .. } => {
176                            if let Some(user_id) = client.user_id() {
177                                let fetch_user = if let Ok(fetch_user) = this
178                                    .update(&mut cx, |this, cx| {
179                                        this.get_user(user_id, cx).log_err()
180                                    }) {
181                                    fetch_user
182                                } else {
183                                    break;
184                                };
185                                let fetch_metrics_id =
186                                    client.request(proto::GetPrivateUserInfo {}).log_err();
187                                let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
188
189                                cx.update(|cx| {
190                                    if let Some(info) = info {
191                                        cx.update_flags(info.staff, info.flags);
192                                        client.telemetry.set_authenticated_user_info(
193                                            Some(info.metrics_id.clone()),
194                                            info.staff,
195                                        )
196                                    }
197                                })?;
198
199                                current_user_tx.send(user).await.ok();
200
201                                this.update(&mut cx, |_, cx| cx.notify())?;
202                            }
203                        }
204                        Status::SignedOut => {
205                            current_user_tx.send(None).await.ok();
206                            this.update(&mut cx, |this, cx| {
207                                cx.notify();
208                                this.clear_contacts()
209                            })?
210                            .await;
211                        }
212                        Status::ConnectionLost => {
213                            this.update(&mut cx, |this, cx| {
214                                cx.notify();
215                                this.clear_contacts()
216                            })?
217                            .await;
218                        }
219                        _ => {}
220                    }
221                }
222                Ok(())
223            }),
224            pending_contact_requests: Default::default(),
225            weak_self: cx.weak_model(),
226        }
227    }
228
229    #[cfg(feature = "test-support")]
230    pub fn clear_cache(&mut self) {
231        self.users.clear();
232    }
233
234    async fn handle_update_invite_info(
235        this: Model<Self>,
236        message: TypedEnvelope<proto::UpdateInviteInfo>,
237        _: Arc<Client>,
238        mut cx: AsyncAppContext,
239    ) -> Result<()> {
240        this.update(&mut cx, |this, cx| {
241            this.invite_info = Some(InviteInfo {
242                url: Arc::from(message.payload.url),
243                count: message.payload.count,
244            });
245            cx.notify();
246        })?;
247        Ok(())
248    }
249
250    async fn handle_show_contacts(
251        this: Model<Self>,
252        _: TypedEnvelope<proto::ShowContacts>,
253        _: Arc<Client>,
254        mut cx: AsyncAppContext,
255    ) -> Result<()> {
256        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
257        Ok(())
258    }
259
260    pub fn invite_info(&self) -> Option<&InviteInfo> {
261        self.invite_info.as_ref()
262    }
263
264    async fn handle_update_contacts(
265        this: Model<Self>,
266        message: TypedEnvelope<proto::UpdateContacts>,
267        _: Arc<Client>,
268        mut cx: AsyncAppContext,
269    ) -> Result<()> {
270        this.update(&mut cx, |this, _| {
271            this.update_contacts_tx
272                .unbounded_send(UpdateContacts::Update(message.payload))
273                .unwrap();
274        })?;
275        Ok(())
276    }
277
278    fn update_contacts(
279        &mut self,
280        message: UpdateContacts,
281        cx: &mut ModelContext<Self>,
282    ) -> Task<Result<()>> {
283        match message {
284            UpdateContacts::Wait(barrier) => {
285                drop(barrier);
286                Task::ready(Ok(()))
287            }
288            UpdateContacts::Clear(barrier) => {
289                self.contacts.clear();
290                self.incoming_contact_requests.clear();
291                self.outgoing_contact_requests.clear();
292                drop(barrier);
293                Task::ready(Ok(()))
294            }
295            UpdateContacts::Update(message) => {
296                let mut user_ids = HashSet::default();
297                for contact in &message.contacts {
298                    user_ids.insert(contact.user_id);
299                }
300                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
301                user_ids.extend(message.outgoing_requests.iter());
302
303                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
304                cx.spawn(|this, mut cx| async move {
305                    load_users.await?;
306
307                    // Users are fetched in parallel above and cached in call to get_users
308                    // No need to parallelize here
309                    let mut updated_contacts = Vec::new();
310                    let this = this
311                        .upgrade()
312                        .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
313                    for contact in message.contacts {
314                        updated_contacts.push(Arc::new(
315                            Contact::from_proto(contact, &this, &mut cx).await?,
316                        ));
317                    }
318
319                    let mut incoming_requests = Vec::new();
320                    for request in message.incoming_requests {
321                        incoming_requests.push({
322                            this.update(&mut cx, |this, cx| {
323                                this.get_user(request.requester_id, cx)
324                            })?
325                            .await?
326                        });
327                    }
328
329                    let mut outgoing_requests = Vec::new();
330                    for requested_user_id in message.outgoing_requests {
331                        outgoing_requests.push(
332                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
333                                .await?,
334                        );
335                    }
336
337                    let removed_contacts =
338                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
339                    let removed_incoming_requests =
340                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
341                    let removed_outgoing_requests =
342                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
343
344                    this.update(&mut cx, |this, cx| {
345                        // Remove contacts
346                        this.contacts
347                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
348                        // Update existing contacts and insert new ones
349                        for updated_contact in updated_contacts {
350                            match this.contacts.binary_search_by_key(
351                                &&updated_contact.user.github_login,
352                                |contact| &contact.user.github_login,
353                            ) {
354                                Ok(ix) => this.contacts[ix] = updated_contact,
355                                Err(ix) => this.contacts.insert(ix, updated_contact),
356                            }
357                        }
358
359                        // Remove incoming contact requests
360                        this.incoming_contact_requests.retain(|user| {
361                            if removed_incoming_requests.contains(&user.id) {
362                                cx.emit(Event::Contact {
363                                    user: user.clone(),
364                                    kind: ContactEventKind::Cancelled,
365                                });
366                                false
367                            } else {
368                                true
369                            }
370                        });
371                        // Update existing incoming requests and insert new ones
372                        for user in incoming_requests {
373                            match this
374                                .incoming_contact_requests
375                                .binary_search_by_key(&&user.github_login, |contact| {
376                                    &contact.github_login
377                                }) {
378                                Ok(ix) => this.incoming_contact_requests[ix] = user,
379                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
380                            }
381                        }
382
383                        // Remove outgoing contact requests
384                        this.outgoing_contact_requests
385                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
386                        // Update existing incoming requests and insert new ones
387                        for request in outgoing_requests {
388                            match this
389                                .outgoing_contact_requests
390                                .binary_search_by_key(&&request.github_login, |contact| {
391                                    &contact.github_login
392                                }) {
393                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
394                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
395                            }
396                        }
397
398                        cx.notify();
399                    })?;
400
401                    Ok(())
402                })
403            }
404        }
405    }
406
407    pub fn contacts(&self) -> &[Arc<Contact>] {
408        &self.contacts
409    }
410
411    pub fn has_contact(&self, user: &Arc<User>) -> bool {
412        self.contacts
413            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
414            .is_ok()
415    }
416
417    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
418        &self.incoming_contact_requests
419    }
420
421    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
422        &self.outgoing_contact_requests
423    }
424
425    pub fn is_contact_request_pending(&self, user: &User) -> bool {
426        self.pending_contact_requests.contains_key(&user.id)
427    }
428
429    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
430        if self
431            .contacts
432            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
433            .is_ok()
434        {
435            ContactRequestStatus::RequestAccepted
436        } else if self
437            .outgoing_contact_requests
438            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
439            .is_ok()
440        {
441            ContactRequestStatus::RequestSent
442        } else if self
443            .incoming_contact_requests
444            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
445            .is_ok()
446        {
447            ContactRequestStatus::RequestReceived
448        } else {
449            ContactRequestStatus::None
450        }
451    }
452
453    pub fn request_contact(
454        &mut self,
455        responder_id: u64,
456        cx: &mut ModelContext<Self>,
457    ) -> Task<Result<()>> {
458        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
459    }
460
461    pub fn remove_contact(
462        &mut self,
463        user_id: u64,
464        cx: &mut ModelContext<Self>,
465    ) -> Task<Result<()>> {
466        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
467    }
468
469    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
470        self.incoming_contact_requests
471            .iter()
472            .any(|user| user.id == user_id)
473    }
474
475    pub fn respond_to_contact_request(
476        &mut self,
477        requester_id: u64,
478        accept: bool,
479        cx: &mut ModelContext<Self>,
480    ) -> Task<Result<()>> {
481        self.perform_contact_request(
482            requester_id,
483            proto::RespondToContactRequest {
484                requester_id,
485                response: if accept {
486                    proto::ContactRequestResponse::Accept
487                } else {
488                    proto::ContactRequestResponse::Decline
489                } as i32,
490            },
491            cx,
492        )
493    }
494
495    pub fn dismiss_contact_request(
496        &mut self,
497        requester_id: u64,
498        cx: &mut ModelContext<Self>,
499    ) -> Task<Result<()>> {
500        let client = self.client.upgrade();
501        cx.spawn(move |_, _| async move {
502            client
503                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
504                .request(proto::RespondToContactRequest {
505                    requester_id,
506                    response: proto::ContactRequestResponse::Dismiss as i32,
507                })
508                .await?;
509            Ok(())
510        })
511    }
512
513    fn perform_contact_request<T: RequestMessage>(
514        &mut self,
515        user_id: u64,
516        request: T,
517        cx: &mut ModelContext<Self>,
518    ) -> Task<Result<()>> {
519        let client = self.client.upgrade();
520        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
521        cx.notify();
522
523        cx.spawn(move |this, mut cx| async move {
524            let response = client
525                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
526                .request(request)
527                .await;
528            this.update(&mut cx, |this, cx| {
529                if let Entry::Occupied(mut request_count) =
530                    this.pending_contact_requests.entry(user_id)
531                {
532                    *request_count.get_mut() -= 1;
533                    if *request_count.get() == 0 {
534                        request_count.remove();
535                    }
536                }
537                cx.notify();
538            })?;
539            response?;
540            Ok(())
541        })
542    }
543
544    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
545        let (tx, mut rx) = postage::barrier::channel();
546        self.update_contacts_tx
547            .unbounded_send(UpdateContacts::Clear(tx))
548            .unwrap();
549        async move {
550            rx.next().await;
551        }
552    }
553
554    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
555        let (tx, mut rx) = postage::barrier::channel();
556        self.update_contacts_tx
557            .unbounded_send(UpdateContacts::Wait(tx))
558            .unwrap();
559        async move {
560            rx.next().await;
561        }
562    }
563
564    pub fn get_users(
565        &mut self,
566        user_ids: Vec<u64>,
567        cx: &mut ModelContext<Self>,
568    ) -> Task<Result<Vec<Arc<User>>>> {
569        let mut user_ids_to_fetch = user_ids.clone();
570        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
571
572        cx.spawn(|this, mut cx| async move {
573            if !user_ids_to_fetch.is_empty() {
574                this.update(&mut cx, |this, cx| {
575                    this.load_users(
576                        proto::GetUsers {
577                            user_ids: user_ids_to_fetch,
578                        },
579                        cx,
580                    )
581                })?
582                .await?;
583            }
584
585            this.update(&mut cx, |this, _| {
586                user_ids
587                    .iter()
588                    .map(|user_id| {
589                        this.users
590                            .get(user_id)
591                            .cloned()
592                            .ok_or_else(|| anyhow!("user {} not found", user_id))
593                    })
594                    .collect()
595            })?
596        })
597    }
598
599    pub fn fuzzy_search_users(
600        &mut self,
601        query: String,
602        cx: &mut ModelContext<Self>,
603    ) -> Task<Result<Vec<Arc<User>>>> {
604        self.load_users(proto::FuzzySearchUsers { query }, cx)
605    }
606
607    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
608        self.users.get(&user_id).cloned()
609    }
610
611    pub fn get_user_optimistic(
612        &mut self,
613        user_id: u64,
614        cx: &mut ModelContext<Self>,
615    ) -> Option<Arc<User>> {
616        if let Some(user) = self.users.get(&user_id).cloned() {
617            return Some(user);
618        }
619
620        self.get_user(user_id, cx).detach_and_log_err(cx);
621        None
622    }
623
624    pub fn get_user(
625        &mut self,
626        user_id: u64,
627        cx: &mut ModelContext<Self>,
628    ) -> Task<Result<Arc<User>>> {
629        if let Some(user) = self.users.get(&user_id).cloned() {
630            return Task::ready(Ok(user));
631        }
632
633        let load_users = self.get_users(vec![user_id], cx);
634        cx.spawn(move |this, mut cx| async move {
635            load_users.await?;
636            this.update(&mut cx, |this, _| {
637                this.users
638                    .get(&user_id)
639                    .cloned()
640                    .ok_or_else(|| anyhow!("server responded with no users"))
641            })?
642        })
643    }
644
645    pub fn current_user(&self) -> Option<Arc<User>> {
646        self.current_user.borrow().clone()
647    }
648
649    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
650        self.current_user.clone()
651    }
652
653    fn load_users(
654        &mut self,
655        request: impl RequestMessage<Response = UsersResponse>,
656        cx: &mut ModelContext<Self>,
657    ) -> Task<Result<Vec<Arc<User>>>> {
658        let client = self.client.clone();
659        cx.spawn(|this, mut cx| async move {
660            if let Some(rpc) = client.upgrade() {
661                let response = rpc.request(request).await.context("error loading users")?;
662                let users = response
663                    .users
664                    .into_iter()
665                    .map(User::new)
666                    .collect::<Vec<_>>();
667
668                this.update(&mut cx, |this, _| {
669                    for user in &users {
670                        this.users.insert(user.id, user.clone());
671                    }
672                })
673                .ok();
674
675                Ok(users)
676            } else {
677                Ok(Vec::new())
678            }
679        })
680    }
681
682    pub fn set_participant_indices(
683        &mut self,
684        participant_indices: HashMap<u64, ParticipantIndex>,
685        cx: &mut ModelContext<Self>,
686    ) {
687        if participant_indices != self.participant_indices {
688            self.participant_indices = participant_indices;
689            cx.emit(Event::ParticipantIndicesChanged);
690        }
691    }
692
693    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
694        &self.participant_indices
695    }
696
697    pub fn participant_names(
698        &self,
699        user_ids: impl Iterator<Item = u64>,
700        cx: &AppContext,
701    ) -> HashMap<u64, SharedString> {
702        let mut ret = HashMap::default();
703        let mut missing_user_ids = Vec::new();
704        for id in user_ids {
705            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
706                ret.insert(id, github_login.into());
707            } else {
708                missing_user_ids.push(id)
709            }
710        }
711        if !missing_user_ids.is_empty() {
712            let this = self.weak_self.clone();
713            cx.spawn(|mut cx| async move {
714                this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
715                    .await
716            })
717            .detach_and_log_err(cx);
718        }
719        ret
720    }
721}
722
723impl User {
724    fn new(message: proto::User) -> Arc<Self> {
725        Arc::new(User {
726            id: message.id,
727            github_login: message.github_login,
728            avatar_uri: message.avatar_url.into(),
729        })
730    }
731}
732
733impl Contact {
734    async fn from_proto(
735        contact: proto::Contact,
736        user_store: &Model<UserStore>,
737        cx: &mut AsyncAppContext,
738    ) -> Result<Self> {
739        let user = user_store
740            .update(cx, |user_store, cx| {
741                user_store.get_user(contact.user_id, cx)
742            })?
743            .await?;
744        Ok(Self {
745            user,
746            online: contact.online,
747            busy: contact.busy,
748        })
749    }
750}
751
752impl Collaborator {
753    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
754        Ok(Self {
755            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
756            replica_id: message.replica_id as ReplicaId,
757            user_id: message.user_id as UserId,
758        })
759    }
760}