user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use collections::{hash_map::Entry, HashMap, HashSet};
  4use feature_flags::FeatureFlagAppExt;
  5use futures::{channel::mpsc, Future, StreamExt};
  6use gpui::{
  7    AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
  8    WeakModel,
  9};
 10use postage::{sink::Sink, watch};
 11use rpc::proto::{RequestMessage, UsersResponse};
 12use std::sync::{Arc, Weak};
 13use text::ReplicaId;
 14use util::TryFutureExt as _;
 15
 16pub type UserId = u64;
 17
 18#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 19pub struct ChannelId(pub u64);
 20
 21impl std::fmt::Display for ChannelId {
 22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 23        self.0.fmt(f)
 24    }
 25}
 26
 27#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 28pub struct ProjectId(pub u64);
 29
 30#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 31pub struct DevServerId(pub u64);
 32
 33#[derive(
 34    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 35)]
 36pub struct RemoteProjectId(pub u64);
 37
 38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 39pub struct ParticipantIndex(pub u32);
 40
 41#[derive(Default, Debug)]
 42pub struct User {
 43    pub id: UserId,
 44    pub github_login: String,
 45    pub avatar_uri: SharedUri,
 46}
 47
 48#[derive(Clone, Debug, PartialEq, Eq)]
 49pub struct Collaborator {
 50    pub peer_id: proto::PeerId,
 51    pub replica_id: ReplicaId,
 52    pub user_id: UserId,
 53}
 54
 55impl PartialOrd for User {
 56    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 57        Some(self.cmp(other))
 58    }
 59}
 60
 61impl Ord for User {
 62    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 63        self.github_login.cmp(&other.github_login)
 64    }
 65}
 66
 67impl PartialEq for User {
 68    fn eq(&self, other: &Self) -> bool {
 69        self.id == other.id && self.github_login == other.github_login
 70    }
 71}
 72
 73impl Eq for User {}
 74
 75#[derive(Debug, PartialEq)]
 76pub struct Contact {
 77    pub user: Arc<User>,
 78    pub online: bool,
 79    pub busy: bool,
 80}
 81
 82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 83pub enum ContactRequestStatus {
 84    None,
 85    RequestSent,
 86    RequestReceived,
 87    RequestAccepted,
 88}
 89
 90pub struct UserStore {
 91    users: HashMap<u64, Arc<User>>,
 92    participant_indices: HashMap<u64, ParticipantIndex>,
 93    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 94    current_user: watch::Receiver<Option<Arc<User>>>,
 95    contacts: Vec<Arc<Contact>>,
 96    incoming_contact_requests: Vec<Arc<User>>,
 97    outgoing_contact_requests: Vec<Arc<User>>,
 98    pending_contact_requests: HashMap<u64, usize>,
 99    invite_info: Option<InviteInfo>,
100    client: Weak<Client>,
101    _maintain_contacts: Task<()>,
102    _maintain_current_user: Task<Result<()>>,
103    weak_self: WeakModel<Self>,
104}
105
106#[derive(Clone)]
107pub struct InviteInfo {
108    pub count: u32,
109    pub url: Arc<str>,
110}
111
112pub enum Event {
113    Contact {
114        user: Arc<User>,
115        kind: ContactEventKind,
116    },
117    ShowContacts,
118    ParticipantIndicesChanged,
119}
120
121#[derive(Clone, Copy)]
122pub enum ContactEventKind {
123    Requested,
124    Accepted,
125    Cancelled,
126}
127
128impl EventEmitter<Event> for UserStore {}
129
130enum UpdateContacts {
131    Update(proto::UpdateContacts),
132    Wait(postage::barrier::Sender),
133    Clear(postage::barrier::Sender),
134}
135
136impl UserStore {
137    pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
138        let (mut current_user_tx, current_user_rx) = watch::channel();
139        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
140        let rpc_subscriptions = vec![
141            client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
142            client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
143            client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
144        ];
145        Self {
146            users: Default::default(),
147            current_user: current_user_rx,
148            contacts: Default::default(),
149            incoming_contact_requests: Default::default(),
150            participant_indices: Default::default(),
151            outgoing_contact_requests: Default::default(),
152            invite_info: None,
153            client: Arc::downgrade(&client),
154            update_contacts_tx,
155            _maintain_contacts: cx.spawn(|this, mut cx| async move {
156                let _subscriptions = rpc_subscriptions;
157                while let Some(message) = update_contacts_rx.next().await {
158                    if let Ok(task) =
159                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
160                    {
161                        task.log_err().await;
162                    } else {
163                        break;
164                    }
165                }
166            }),
167            _maintain_current_user: cx.spawn(|this, mut cx| async move {
168                let mut status = client.status();
169                let weak = Arc::downgrade(&client);
170                drop(client);
171                while let Some(status) = status.next().await {
172                    // if the client is dropped, the app is shutting down.
173                    let Some(client) = weak.upgrade() else {
174                        return Ok(());
175                    };
176                    match status {
177                        Status::Connected { .. } => {
178                            if let Some(user_id) = client.user_id() {
179                                let fetch_user = if let Ok(fetch_user) = this
180                                    .update(&mut cx, |this, cx| {
181                                        this.get_user(user_id, cx).log_err()
182                                    }) {
183                                    fetch_user
184                                } else {
185                                    break;
186                                };
187                                let fetch_metrics_id =
188                                    client.request(proto::GetPrivateUserInfo {}).log_err();
189                                let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
190
191                                cx.update(|cx| {
192                                    if let Some(info) = info {
193                                        cx.update_flags(info.staff, info.flags);
194                                        client.telemetry.set_authenticated_user_info(
195                                            Some(info.metrics_id.clone()),
196                                            info.staff,
197                                        )
198                                    }
199                                })?;
200
201                                current_user_tx.send(user).await.ok();
202
203                                this.update(&mut cx, |_, cx| cx.notify())?;
204                            }
205                        }
206                        Status::SignedOut => {
207                            current_user_tx.send(None).await.ok();
208                            this.update(&mut cx, |this, cx| {
209                                cx.notify();
210                                this.clear_contacts()
211                            })?
212                            .await;
213                        }
214                        Status::ConnectionLost => {
215                            this.update(&mut cx, |this, cx| {
216                                cx.notify();
217                                this.clear_contacts()
218                            })?
219                            .await;
220                        }
221                        _ => {}
222                    }
223                }
224                Ok(())
225            }),
226            pending_contact_requests: Default::default(),
227            weak_self: cx.weak_model(),
228        }
229    }
230
231    #[cfg(feature = "test-support")]
232    pub fn clear_cache(&mut self) {
233        self.users.clear();
234    }
235
236    async fn handle_update_invite_info(
237        this: Model<Self>,
238        message: TypedEnvelope<proto::UpdateInviteInfo>,
239        _: Arc<Client>,
240        mut cx: AsyncAppContext,
241    ) -> Result<()> {
242        this.update(&mut cx, |this, cx| {
243            this.invite_info = Some(InviteInfo {
244                url: Arc::from(message.payload.url),
245                count: message.payload.count,
246            });
247            cx.notify();
248        })?;
249        Ok(())
250    }
251
252    async fn handle_show_contacts(
253        this: Model<Self>,
254        _: TypedEnvelope<proto::ShowContacts>,
255        _: Arc<Client>,
256        mut cx: AsyncAppContext,
257    ) -> Result<()> {
258        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
259        Ok(())
260    }
261
262    pub fn invite_info(&self) -> Option<&InviteInfo> {
263        self.invite_info.as_ref()
264    }
265
266    async fn handle_update_contacts(
267        this: Model<Self>,
268        message: TypedEnvelope<proto::UpdateContacts>,
269        _: Arc<Client>,
270        mut cx: AsyncAppContext,
271    ) -> Result<()> {
272        this.update(&mut cx, |this, _| {
273            this.update_contacts_tx
274                .unbounded_send(UpdateContacts::Update(message.payload))
275                .unwrap();
276        })?;
277        Ok(())
278    }
279
280    fn update_contacts(
281        &mut self,
282        message: UpdateContacts,
283        cx: &mut ModelContext<Self>,
284    ) -> Task<Result<()>> {
285        match message {
286            UpdateContacts::Wait(barrier) => {
287                drop(barrier);
288                Task::ready(Ok(()))
289            }
290            UpdateContacts::Clear(barrier) => {
291                self.contacts.clear();
292                self.incoming_contact_requests.clear();
293                self.outgoing_contact_requests.clear();
294                drop(barrier);
295                Task::ready(Ok(()))
296            }
297            UpdateContacts::Update(message) => {
298                let mut user_ids = HashSet::default();
299                for contact in &message.contacts {
300                    user_ids.insert(contact.user_id);
301                }
302                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
303                user_ids.extend(message.outgoing_requests.iter());
304
305                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
306                cx.spawn(|this, mut cx| async move {
307                    load_users.await?;
308
309                    // Users are fetched in parallel above and cached in call to get_users
310                    // No need to parallelize here
311                    let mut updated_contacts = Vec::new();
312                    let this = this
313                        .upgrade()
314                        .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
315                    for contact in message.contacts {
316                        updated_contacts.push(Arc::new(
317                            Contact::from_proto(contact, &this, &mut cx).await?,
318                        ));
319                    }
320
321                    let mut incoming_requests = Vec::new();
322                    for request in message.incoming_requests {
323                        incoming_requests.push({
324                            this.update(&mut cx, |this, cx| {
325                                this.get_user(request.requester_id, cx)
326                            })?
327                            .await?
328                        });
329                    }
330
331                    let mut outgoing_requests = Vec::new();
332                    for requested_user_id in message.outgoing_requests {
333                        outgoing_requests.push(
334                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
335                                .await?,
336                        );
337                    }
338
339                    let removed_contacts =
340                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
341                    let removed_incoming_requests =
342                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
343                    let removed_outgoing_requests =
344                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
345
346                    this.update(&mut cx, |this, cx| {
347                        // Remove contacts
348                        this.contacts
349                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
350                        // Update existing contacts and insert new ones
351                        for updated_contact in updated_contacts {
352                            match this.contacts.binary_search_by_key(
353                                &&updated_contact.user.github_login,
354                                |contact| &contact.user.github_login,
355                            ) {
356                                Ok(ix) => this.contacts[ix] = updated_contact,
357                                Err(ix) => this.contacts.insert(ix, updated_contact),
358                            }
359                        }
360
361                        // Remove incoming contact requests
362                        this.incoming_contact_requests.retain(|user| {
363                            if removed_incoming_requests.contains(&user.id) {
364                                cx.emit(Event::Contact {
365                                    user: user.clone(),
366                                    kind: ContactEventKind::Cancelled,
367                                });
368                                false
369                            } else {
370                                true
371                            }
372                        });
373                        // Update existing incoming requests and insert new ones
374                        for user in incoming_requests {
375                            match this
376                                .incoming_contact_requests
377                                .binary_search_by_key(&&user.github_login, |contact| {
378                                    &contact.github_login
379                                }) {
380                                Ok(ix) => this.incoming_contact_requests[ix] = user,
381                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
382                            }
383                        }
384
385                        // Remove outgoing contact requests
386                        this.outgoing_contact_requests
387                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
388                        // Update existing incoming requests and insert new ones
389                        for request in outgoing_requests {
390                            match this
391                                .outgoing_contact_requests
392                                .binary_search_by_key(&&request.github_login, |contact| {
393                                    &contact.github_login
394                                }) {
395                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
396                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
397                            }
398                        }
399
400                        cx.notify();
401                    })?;
402
403                    Ok(())
404                })
405            }
406        }
407    }
408
409    pub fn contacts(&self) -> &[Arc<Contact>] {
410        &self.contacts
411    }
412
413    pub fn has_contact(&self, user: &Arc<User>) -> bool {
414        self.contacts
415            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
416            .is_ok()
417    }
418
419    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
420        &self.incoming_contact_requests
421    }
422
423    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
424        &self.outgoing_contact_requests
425    }
426
427    pub fn is_contact_request_pending(&self, user: &User) -> bool {
428        self.pending_contact_requests.contains_key(&user.id)
429    }
430
431    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
432        if self
433            .contacts
434            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
435            .is_ok()
436        {
437            ContactRequestStatus::RequestAccepted
438        } else if self
439            .outgoing_contact_requests
440            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
441            .is_ok()
442        {
443            ContactRequestStatus::RequestSent
444        } else if self
445            .incoming_contact_requests
446            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
447            .is_ok()
448        {
449            ContactRequestStatus::RequestReceived
450        } else {
451            ContactRequestStatus::None
452        }
453    }
454
455    pub fn request_contact(
456        &mut self,
457        responder_id: u64,
458        cx: &mut ModelContext<Self>,
459    ) -> Task<Result<()>> {
460        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
461    }
462
463    pub fn remove_contact(
464        &mut self,
465        user_id: u64,
466        cx: &mut ModelContext<Self>,
467    ) -> Task<Result<()>> {
468        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
469    }
470
471    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
472        self.incoming_contact_requests
473            .iter()
474            .any(|user| user.id == user_id)
475    }
476
477    pub fn respond_to_contact_request(
478        &mut self,
479        requester_id: u64,
480        accept: bool,
481        cx: &mut ModelContext<Self>,
482    ) -> Task<Result<()>> {
483        self.perform_contact_request(
484            requester_id,
485            proto::RespondToContactRequest {
486                requester_id,
487                response: if accept {
488                    proto::ContactRequestResponse::Accept
489                } else {
490                    proto::ContactRequestResponse::Decline
491                } as i32,
492            },
493            cx,
494        )
495    }
496
497    pub fn dismiss_contact_request(
498        &mut self,
499        requester_id: u64,
500        cx: &mut ModelContext<Self>,
501    ) -> Task<Result<()>> {
502        let client = self.client.upgrade();
503        cx.spawn(move |_, _| async move {
504            client
505                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
506                .request(proto::RespondToContactRequest {
507                    requester_id,
508                    response: proto::ContactRequestResponse::Dismiss as i32,
509                })
510                .await?;
511            Ok(())
512        })
513    }
514
515    fn perform_contact_request<T: RequestMessage>(
516        &mut self,
517        user_id: u64,
518        request: T,
519        cx: &mut ModelContext<Self>,
520    ) -> Task<Result<()>> {
521        let client = self.client.upgrade();
522        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
523        cx.notify();
524
525        cx.spawn(move |this, mut cx| async move {
526            let response = client
527                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
528                .request(request)
529                .await;
530            this.update(&mut cx, |this, cx| {
531                if let Entry::Occupied(mut request_count) =
532                    this.pending_contact_requests.entry(user_id)
533                {
534                    *request_count.get_mut() -= 1;
535                    if *request_count.get() == 0 {
536                        request_count.remove();
537                    }
538                }
539                cx.notify();
540            })?;
541            response?;
542            Ok(())
543        })
544    }
545
546    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
547        let (tx, mut rx) = postage::barrier::channel();
548        self.update_contacts_tx
549            .unbounded_send(UpdateContacts::Clear(tx))
550            .unwrap();
551        async move {
552            rx.next().await;
553        }
554    }
555
556    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
557        let (tx, mut rx) = postage::barrier::channel();
558        self.update_contacts_tx
559            .unbounded_send(UpdateContacts::Wait(tx))
560            .unwrap();
561        async move {
562            rx.next().await;
563        }
564    }
565
566    pub fn get_users(
567        &mut self,
568        user_ids: Vec<u64>,
569        cx: &mut ModelContext<Self>,
570    ) -> Task<Result<Vec<Arc<User>>>> {
571        let mut user_ids_to_fetch = user_ids.clone();
572        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
573
574        cx.spawn(|this, mut cx| async move {
575            if !user_ids_to_fetch.is_empty() {
576                this.update(&mut cx, |this, cx| {
577                    this.load_users(
578                        proto::GetUsers {
579                            user_ids: user_ids_to_fetch,
580                        },
581                        cx,
582                    )
583                })?
584                .await?;
585            }
586
587            this.update(&mut cx, |this, _| {
588                user_ids
589                    .iter()
590                    .map(|user_id| {
591                        this.users
592                            .get(user_id)
593                            .cloned()
594                            .ok_or_else(|| anyhow!("user {} not found", user_id))
595                    })
596                    .collect()
597            })?
598        })
599    }
600
601    pub fn fuzzy_search_users(
602        &mut self,
603        query: String,
604        cx: &mut ModelContext<Self>,
605    ) -> Task<Result<Vec<Arc<User>>>> {
606        self.load_users(proto::FuzzySearchUsers { query }, cx)
607    }
608
609    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
610        self.users.get(&user_id).cloned()
611    }
612
613    pub fn get_user_optimistic(
614        &mut self,
615        user_id: u64,
616        cx: &mut ModelContext<Self>,
617    ) -> Option<Arc<User>> {
618        if let Some(user) = self.users.get(&user_id).cloned() {
619            return Some(user);
620        }
621
622        self.get_user(user_id, cx).detach_and_log_err(cx);
623        None
624    }
625
626    pub fn get_user(
627        &mut self,
628        user_id: u64,
629        cx: &mut ModelContext<Self>,
630    ) -> Task<Result<Arc<User>>> {
631        if let Some(user) = self.users.get(&user_id).cloned() {
632            return Task::ready(Ok(user));
633        }
634
635        let load_users = self.get_users(vec![user_id], cx);
636        cx.spawn(move |this, mut cx| async move {
637            load_users.await?;
638            this.update(&mut cx, |this, _| {
639                this.users
640                    .get(&user_id)
641                    .cloned()
642                    .ok_or_else(|| anyhow!("server responded with no users"))
643            })?
644        })
645    }
646
647    pub fn current_user(&self) -> Option<Arc<User>> {
648        self.current_user.borrow().clone()
649    }
650
651    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
652        self.current_user.clone()
653    }
654
655    fn load_users(
656        &mut self,
657        request: impl RequestMessage<Response = UsersResponse>,
658        cx: &mut ModelContext<Self>,
659    ) -> Task<Result<Vec<Arc<User>>>> {
660        let client = self.client.clone();
661        cx.spawn(|this, mut cx| async move {
662            if let Some(rpc) = client.upgrade() {
663                let response = rpc.request(request).await.context("error loading users")?;
664                let users = response
665                    .users
666                    .into_iter()
667                    .map(User::new)
668                    .collect::<Vec<_>>();
669
670                this.update(&mut cx, |this, _| {
671                    for user in &users {
672                        this.users.insert(user.id, user.clone());
673                    }
674                })
675                .ok();
676
677                Ok(users)
678            } else {
679                Ok(Vec::new())
680            }
681        })
682    }
683
684    pub fn set_participant_indices(
685        &mut self,
686        participant_indices: HashMap<u64, ParticipantIndex>,
687        cx: &mut ModelContext<Self>,
688    ) {
689        if participant_indices != self.participant_indices {
690            self.participant_indices = participant_indices;
691            cx.emit(Event::ParticipantIndicesChanged);
692        }
693    }
694
695    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
696        &self.participant_indices
697    }
698
699    pub fn participant_names(
700        &self,
701        user_ids: impl Iterator<Item = u64>,
702        cx: &AppContext,
703    ) -> HashMap<u64, SharedString> {
704        let mut ret = HashMap::default();
705        let mut missing_user_ids = Vec::new();
706        for id in user_ids {
707            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
708                ret.insert(id, github_login.into());
709            } else {
710                missing_user_ids.push(id)
711            }
712        }
713        if !missing_user_ids.is_empty() {
714            let this = self.weak_self.clone();
715            cx.spawn(|mut cx| async move {
716                this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
717                    .await
718            })
719            .detach_and_log_err(cx);
720        }
721        ret
722    }
723}
724
725impl User {
726    fn new(message: proto::User) -> Arc<Self> {
727        Arc::new(User {
728            id: message.id,
729            github_login: message.github_login,
730            avatar_uri: message.avatar_url.into(),
731        })
732    }
733}
734
735impl Contact {
736    async fn from_proto(
737        contact: proto::Contact,
738        user_store: &Model<UserStore>,
739        cx: &mut AsyncAppContext,
740    ) -> Result<Self> {
741        let user = user_store
742            .update(cx, |user_store, cx| {
743                user_store.get_user(contact.user_id, cx)
744            })?
745            .await?;
746        Ok(Self {
747            user,
748            online: contact.online,
749            busy: contact.busy,
750        })
751    }
752}
753
754impl Collaborator {
755    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
756        Ok(Self {
757            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
758            replica_id: message.replica_id as ReplicaId,
759            user_id: message.user_id as UserId,
760        })
761    }
762}