user.rs

  1use super::{Client, Status, TypedEnvelope, proto};
  2use anyhow::{Context as _, Result, anyhow};
  3use chrono::{DateTime, Utc};
  4use collections::{HashMap, HashSet, hash_map::Entry};
  5use feature_flags::FeatureFlagAppExt;
  6use futures::{Future, StreamExt, channel::mpsc};
  7use gpui::{
  8    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  9};
 10use postage::{sink::Sink, watch};
 11use rpc::proto::{RequestMessage, UsersResponse};
 12use std::sync::{Arc, Weak};
 13use text::ReplicaId;
 14use util::TryFutureExt as _;
 15
 16pub type UserId = u64;
 17
 18#[derive(
 19    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 20)]
 21pub struct ChannelId(pub u64);
 22
 23impl std::fmt::Display for ChannelId {
 24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 25        self.0.fmt(f)
 26    }
 27}
 28
 29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 30pub struct ProjectId(pub u64);
 31
 32impl ProjectId {
 33    pub fn to_proto(&self) -> u64 {
 34        self.0
 35    }
 36}
 37
 38#[derive(
 39    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 40)]
 41pub struct DevServerProjectId(pub u64);
 42
 43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 44pub struct ParticipantIndex(pub u32);
 45
 46#[derive(Default, Debug)]
 47pub struct User {
 48    pub id: UserId,
 49    pub github_login: String,
 50    pub avatar_uri: SharedUri,
 51    pub name: Option<String>,
 52    pub email: Option<String>,
 53}
 54
 55#[derive(Clone, Debug, PartialEq, Eq)]
 56pub struct Collaborator {
 57    pub peer_id: proto::PeerId,
 58    pub replica_id: ReplicaId,
 59    pub user_id: UserId,
 60    pub is_host: bool,
 61}
 62
 63impl PartialOrd for User {
 64    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 65        Some(self.cmp(other))
 66    }
 67}
 68
 69impl Ord for User {
 70    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 71        self.github_login.cmp(&other.github_login)
 72    }
 73}
 74
 75impl PartialEq for User {
 76    fn eq(&self, other: &Self) -> bool {
 77        self.id == other.id && self.github_login == other.github_login
 78    }
 79}
 80
 81impl Eq for User {}
 82
 83#[derive(Debug, PartialEq)]
 84pub struct Contact {
 85    pub user: Arc<User>,
 86    pub online: bool,
 87    pub busy: bool,
 88}
 89
 90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 91pub enum ContactRequestStatus {
 92    None,
 93    RequestSent,
 94    RequestReceived,
 95    RequestAccepted,
 96}
 97
 98pub struct UserStore {
 99    users: HashMap<u64, Arc<User>>,
100    by_github_login: HashMap<String, u64>,
101    participant_indices: HashMap<u64, ParticipantIndex>,
102    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
103    current_plan: Option<proto::Plan>,
104    trial_started_at: Option<DateTime<Utc>>,
105    model_request_usage_amount: Option<u32>,
106    model_request_usage_limit: Option<proto::UsageLimit>,
107    edit_predictions_usage_amount: Option<u32>,
108    edit_predictions_usage_limit: Option<proto::UsageLimit>,
109    is_usage_based_billing_enabled: Option<bool>,
110    current_user: watch::Receiver<Option<Arc<User>>>,
111    accepted_tos_at: Option<Option<DateTime<Utc>>>,
112    contacts: Vec<Arc<Contact>>,
113    incoming_contact_requests: Vec<Arc<User>>,
114    outgoing_contact_requests: Vec<Arc<User>>,
115    pending_contact_requests: HashMap<u64, usize>,
116    invite_info: Option<InviteInfo>,
117    client: Weak<Client>,
118    _maintain_contacts: Task<()>,
119    _maintain_current_user: Task<Result<()>>,
120    weak_self: WeakEntity<Self>,
121}
122
123#[derive(Clone)]
124pub struct InviteInfo {
125    pub count: u32,
126    pub url: Arc<str>,
127}
128
129pub enum Event {
130    Contact {
131        user: Arc<User>,
132        kind: ContactEventKind,
133    },
134    ShowContacts,
135    ParticipantIndicesChanged,
136    PrivateUserInfoUpdated,
137}
138
139#[derive(Clone, Copy)]
140pub enum ContactEventKind {
141    Requested,
142    Accepted,
143    Cancelled,
144}
145
146impl EventEmitter<Event> for UserStore {}
147
148enum UpdateContacts {
149    Update(proto::UpdateContacts),
150    Wait(postage::barrier::Sender),
151    Clear(postage::barrier::Sender),
152}
153
154impl UserStore {
155    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
156        let (mut current_user_tx, current_user_rx) = watch::channel();
157        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
158        let rpc_subscriptions = vec![
159            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
160            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
161            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
162            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
163        ];
164        Self {
165            users: Default::default(),
166            by_github_login: Default::default(),
167            current_user: current_user_rx,
168            current_plan: None,
169            trial_started_at: None,
170            model_request_usage_amount: None,
171            model_request_usage_limit: None,
172            edit_predictions_usage_amount: None,
173            edit_predictions_usage_limit: None,
174            is_usage_based_billing_enabled: None,
175            accepted_tos_at: None,
176            contacts: Default::default(),
177            incoming_contact_requests: Default::default(),
178            participant_indices: Default::default(),
179            outgoing_contact_requests: Default::default(),
180            invite_info: None,
181            client: Arc::downgrade(&client),
182            update_contacts_tx,
183            _maintain_contacts: cx.spawn(async move |this, cx| {
184                let _subscriptions = rpc_subscriptions;
185                while let Some(message) = update_contacts_rx.next().await {
186                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
187                    {
188                        task.log_err().await;
189                    } else {
190                        break;
191                    }
192                }
193            }),
194            _maintain_current_user: cx.spawn(async move |this, cx| {
195                let mut status = client.status();
196                let weak = Arc::downgrade(&client);
197                drop(client);
198                while let Some(status) = status.next().await {
199                    // if the client is dropped, the app is shutting down.
200                    let Some(client) = weak.upgrade() else {
201                        return Ok(());
202                    };
203                    match status {
204                        Status::Connected { .. } => {
205                            if let Some(user_id) = client.user_id() {
206                                let fetch_user = if let Ok(fetch_user) =
207                                    this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
208                                {
209                                    fetch_user
210                                } else {
211                                    break;
212                                };
213                                let fetch_private_user_info =
214                                    client.request(proto::GetPrivateUserInfo {}).log_err();
215                                let (user, info) =
216                                    futures::join!(fetch_user, fetch_private_user_info);
217
218                                cx.update(|cx| {
219                                    if let Some(info) = info {
220                                        let staff =
221                                            info.staff && !*feature_flags::ZED_DISABLE_STAFF;
222                                        cx.update_flags(staff, info.flags);
223                                        client.telemetry.set_authenticated_user_info(
224                                            Some(info.metrics_id.clone()),
225                                            staff,
226                                        );
227
228                                        this.update(cx, |this, cx| {
229                                            let accepted_tos_at = {
230                                                #[cfg(debug_assertions)]
231                                                if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
232                                                {
233                                                    None
234                                                } else {
235                                                    info.accepted_tos_at
236                                                }
237
238                                                #[cfg(not(debug_assertions))]
239                                                info.accepted_tos_at
240                                            };
241
242                                            this.set_current_user_accepted_tos_at(accepted_tos_at);
243                                            cx.emit(Event::PrivateUserInfoUpdated);
244                                        })
245                                    } else {
246                                        anyhow::Ok(())
247                                    }
248                                })??;
249
250                                current_user_tx.send(user).await.ok();
251
252                                this.update(cx, |_, cx| cx.notify())?;
253                            }
254                        }
255                        Status::SignedOut => {
256                            current_user_tx.send(None).await.ok();
257                            this.update(cx, |this, cx| {
258                                this.accepted_tos_at = None;
259                                cx.emit(Event::PrivateUserInfoUpdated);
260                                cx.notify();
261                                this.clear_contacts()
262                            })?
263                            .await;
264                        }
265                        Status::ConnectionLost => {
266                            this.update(cx, |this, cx| {
267                                cx.notify();
268                                this.clear_contacts()
269                            })?
270                            .await;
271                        }
272                        _ => {}
273                    }
274                }
275                Ok(())
276            }),
277            pending_contact_requests: Default::default(),
278            weak_self: cx.weak_entity(),
279        }
280    }
281
282    #[cfg(feature = "test-support")]
283    pub fn clear_cache(&mut self) {
284        self.users.clear();
285        self.by_github_login.clear();
286    }
287
288    async fn handle_update_invite_info(
289        this: Entity<Self>,
290        message: TypedEnvelope<proto::UpdateInviteInfo>,
291        mut cx: AsyncApp,
292    ) -> Result<()> {
293        this.update(&mut cx, |this, cx| {
294            this.invite_info = Some(InviteInfo {
295                url: Arc::from(message.payload.url),
296                count: message.payload.count,
297            });
298            cx.notify();
299        })?;
300        Ok(())
301    }
302
303    async fn handle_show_contacts(
304        this: Entity<Self>,
305        _: TypedEnvelope<proto::ShowContacts>,
306        mut cx: AsyncApp,
307    ) -> Result<()> {
308        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
309        Ok(())
310    }
311
312    pub fn invite_info(&self) -> Option<&InviteInfo> {
313        self.invite_info.as_ref()
314    }
315
316    async fn handle_update_contacts(
317        this: Entity<Self>,
318        message: TypedEnvelope<proto::UpdateContacts>,
319        mut cx: AsyncApp,
320    ) -> Result<()> {
321        this.update(&mut cx, |this, _| {
322            this.update_contacts_tx
323                .unbounded_send(UpdateContacts::Update(message.payload))
324                .unwrap();
325        })?;
326        Ok(())
327    }
328
329    async fn handle_update_plan(
330        this: Entity<Self>,
331        message: TypedEnvelope<proto::UpdateUserPlan>,
332        mut cx: AsyncApp,
333    ) -> Result<()> {
334        this.update(&mut cx, |this, cx| {
335            this.current_plan = Some(message.payload.plan());
336            this.trial_started_at = message
337                .payload
338                .trial_started_at
339                .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
340            this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
341
342            if let Some(usage) = message.payload.usage {
343                this.model_request_usage_amount = Some(usage.model_requests_usage_amount);
344                this.model_request_usage_limit = usage.model_requests_usage_limit;
345                this.edit_predictions_usage_amount = Some(usage.edit_predictions_usage_amount);
346                this.edit_predictions_usage_limit = usage.edit_predictions_usage_limit;
347            }
348
349            cx.notify();
350        })?;
351        Ok(())
352    }
353
354    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
355        match message {
356            UpdateContacts::Wait(barrier) => {
357                drop(barrier);
358                Task::ready(Ok(()))
359            }
360            UpdateContacts::Clear(barrier) => {
361                self.contacts.clear();
362                self.incoming_contact_requests.clear();
363                self.outgoing_contact_requests.clear();
364                drop(barrier);
365                Task::ready(Ok(()))
366            }
367            UpdateContacts::Update(message) => {
368                let mut user_ids = HashSet::default();
369                for contact in &message.contacts {
370                    user_ids.insert(contact.user_id);
371                }
372                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
373                user_ids.extend(message.outgoing_requests.iter());
374
375                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
376                cx.spawn(async move |this, cx| {
377                    load_users.await?;
378
379                    // Users are fetched in parallel above and cached in call to get_users
380                    // No need to parallelize here
381                    let mut updated_contacts = Vec::new();
382                    let this = this
383                        .upgrade()
384                        .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
385                    for contact in message.contacts {
386                        updated_contacts
387                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
388                    }
389
390                    let mut incoming_requests = Vec::new();
391                    for request in message.incoming_requests {
392                        incoming_requests.push({
393                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
394                                .await?
395                        });
396                    }
397
398                    let mut outgoing_requests = Vec::new();
399                    for requested_user_id in message.outgoing_requests {
400                        outgoing_requests.push(
401                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
402                                .await?,
403                        );
404                    }
405
406                    let removed_contacts =
407                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
408                    let removed_incoming_requests =
409                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
410                    let removed_outgoing_requests =
411                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
412
413                    this.update(cx, |this, cx| {
414                        // Remove contacts
415                        this.contacts
416                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
417                        // Update existing contacts and insert new ones
418                        for updated_contact in updated_contacts {
419                            match this.contacts.binary_search_by_key(
420                                &&updated_contact.user.github_login,
421                                |contact| &contact.user.github_login,
422                            ) {
423                                Ok(ix) => this.contacts[ix] = updated_contact,
424                                Err(ix) => this.contacts.insert(ix, updated_contact),
425                            }
426                        }
427
428                        // Remove incoming contact requests
429                        this.incoming_contact_requests.retain(|user| {
430                            if removed_incoming_requests.contains(&user.id) {
431                                cx.emit(Event::Contact {
432                                    user: user.clone(),
433                                    kind: ContactEventKind::Cancelled,
434                                });
435                                false
436                            } else {
437                                true
438                            }
439                        });
440                        // Update existing incoming requests and insert new ones
441                        for user in incoming_requests {
442                            match this
443                                .incoming_contact_requests
444                                .binary_search_by_key(&&user.github_login, |contact| {
445                                    &contact.github_login
446                                }) {
447                                Ok(ix) => this.incoming_contact_requests[ix] = user,
448                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
449                            }
450                        }
451
452                        // Remove outgoing contact requests
453                        this.outgoing_contact_requests
454                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
455                        // Update existing incoming requests and insert new ones
456                        for request in outgoing_requests {
457                            match this
458                                .outgoing_contact_requests
459                                .binary_search_by_key(&&request.github_login, |contact| {
460                                    &contact.github_login
461                                }) {
462                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
463                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
464                            }
465                        }
466
467                        cx.notify();
468                    })?;
469
470                    Ok(())
471                })
472            }
473        }
474    }
475
476    pub fn contacts(&self) -> &[Arc<Contact>] {
477        &self.contacts
478    }
479
480    pub fn has_contact(&self, user: &Arc<User>) -> bool {
481        self.contacts
482            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
483            .is_ok()
484    }
485
486    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
487        &self.incoming_contact_requests
488    }
489
490    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
491        &self.outgoing_contact_requests
492    }
493
494    pub fn is_contact_request_pending(&self, user: &User) -> bool {
495        self.pending_contact_requests.contains_key(&user.id)
496    }
497
498    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
499        if self
500            .contacts
501            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
502            .is_ok()
503        {
504            ContactRequestStatus::RequestAccepted
505        } else if self
506            .outgoing_contact_requests
507            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
508            .is_ok()
509        {
510            ContactRequestStatus::RequestSent
511        } else if self
512            .incoming_contact_requests
513            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
514            .is_ok()
515        {
516            ContactRequestStatus::RequestReceived
517        } else {
518            ContactRequestStatus::None
519        }
520    }
521
522    pub fn request_contact(
523        &mut self,
524        responder_id: u64,
525        cx: &mut Context<Self>,
526    ) -> Task<Result<()>> {
527        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
528    }
529
530    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
531        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
532    }
533
534    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
535        self.incoming_contact_requests
536            .iter()
537            .any(|user| user.id == user_id)
538    }
539
540    pub fn respond_to_contact_request(
541        &mut self,
542        requester_id: u64,
543        accept: bool,
544        cx: &mut Context<Self>,
545    ) -> Task<Result<()>> {
546        self.perform_contact_request(
547            requester_id,
548            proto::RespondToContactRequest {
549                requester_id,
550                response: if accept {
551                    proto::ContactRequestResponse::Accept
552                } else {
553                    proto::ContactRequestResponse::Decline
554                } as i32,
555            },
556            cx,
557        )
558    }
559
560    pub fn dismiss_contact_request(
561        &self,
562        requester_id: u64,
563        cx: &Context<Self>,
564    ) -> Task<Result<()>> {
565        let client = self.client.upgrade();
566        cx.spawn(async move |_, _| {
567            client
568                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
569                .request(proto::RespondToContactRequest {
570                    requester_id,
571                    response: proto::ContactRequestResponse::Dismiss as i32,
572                })
573                .await?;
574            Ok(())
575        })
576    }
577
578    fn perform_contact_request<T: RequestMessage>(
579        &mut self,
580        user_id: u64,
581        request: T,
582        cx: &mut Context<Self>,
583    ) -> Task<Result<()>> {
584        let client = self.client.upgrade();
585        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
586        cx.notify();
587
588        cx.spawn(async move |this, cx| {
589            let response = client
590                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
591                .request(request)
592                .await;
593            this.update(cx, |this, cx| {
594                if let Entry::Occupied(mut request_count) =
595                    this.pending_contact_requests.entry(user_id)
596                {
597                    *request_count.get_mut() -= 1;
598                    if *request_count.get() == 0 {
599                        request_count.remove();
600                    }
601                }
602                cx.notify();
603            })?;
604            response?;
605            Ok(())
606        })
607    }
608
609    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
610        let (tx, mut rx) = postage::barrier::channel();
611        self.update_contacts_tx
612            .unbounded_send(UpdateContacts::Clear(tx))
613            .unwrap();
614        async move {
615            rx.next().await;
616        }
617    }
618
619    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
620        let (tx, mut rx) = postage::barrier::channel();
621        self.update_contacts_tx
622            .unbounded_send(UpdateContacts::Wait(tx))
623            .unwrap();
624        async move {
625            rx.next().await;
626        }
627    }
628
629    pub fn get_users(
630        &self,
631        user_ids: Vec<u64>,
632        cx: &Context<Self>,
633    ) -> Task<Result<Vec<Arc<User>>>> {
634        let mut user_ids_to_fetch = user_ids.clone();
635        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
636
637        cx.spawn(async move |this, cx| {
638            if !user_ids_to_fetch.is_empty() {
639                this.update(cx, |this, cx| {
640                    this.load_users(
641                        proto::GetUsers {
642                            user_ids: user_ids_to_fetch,
643                        },
644                        cx,
645                    )
646                })?
647                .await?;
648            }
649
650            this.update(cx, |this, _| {
651                user_ids
652                    .iter()
653                    .map(|user_id| {
654                        this.users
655                            .get(user_id)
656                            .cloned()
657                            .ok_or_else(|| anyhow!("user {} not found", user_id))
658                    })
659                    .collect()
660            })?
661        })
662    }
663
664    pub fn fuzzy_search_users(
665        &self,
666        query: String,
667        cx: &Context<Self>,
668    ) -> Task<Result<Vec<Arc<User>>>> {
669        self.load_users(proto::FuzzySearchUsers { query }, cx)
670    }
671
672    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
673        self.users.get(&user_id).cloned()
674    }
675
676    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
677        if let Some(user) = self.users.get(&user_id).cloned() {
678            return Some(user);
679        }
680
681        self.get_user(user_id, cx).detach_and_log_err(cx);
682        None
683    }
684
685    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
686        if let Some(user) = self.users.get(&user_id).cloned() {
687            return Task::ready(Ok(user));
688        }
689
690        let load_users = self.get_users(vec![user_id], cx);
691        cx.spawn(async move |this, cx| {
692            load_users.await?;
693            this.update(cx, |this, _| {
694                this.users
695                    .get(&user_id)
696                    .cloned()
697                    .ok_or_else(|| anyhow!("server responded with no users"))
698            })?
699        })
700    }
701
702    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
703        self.by_github_login
704            .get(github_login)
705            .and_then(|id| self.users.get(id).cloned())
706    }
707
708    pub fn current_user(&self) -> Option<Arc<User>> {
709        self.current_user.borrow().clone()
710    }
711
712    pub fn current_plan(&self) -> Option<proto::Plan> {
713        self.current_plan
714    }
715
716    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
717        self.trial_started_at
718    }
719
720    pub fn usage_based_billing_enabled(&self) -> Option<bool> {
721        self.is_usage_based_billing_enabled
722    }
723
724    pub fn model_request_usage_amount(&self) -> Option<u32> {
725        self.model_request_usage_amount
726    }
727
728    pub fn model_request_usage_limit(&self) -> Option<proto::UsageLimit> {
729        self.model_request_usage_limit.clone()
730    }
731
732    pub fn edit_predictions_usage_amount(&self) -> Option<u32> {
733        self.edit_predictions_usage_amount
734    }
735
736    pub fn edit_predictions_usage_limit(&self) -> Option<proto::UsageLimit> {
737        self.edit_predictions_usage_limit.clone()
738    }
739
740    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
741        self.current_user.clone()
742    }
743
744    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
745        self.accepted_tos_at
746            .map(|accepted_tos_at| accepted_tos_at.is_some())
747    }
748
749    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
750        if self.current_user().is_none() {
751            return Task::ready(Err(anyhow!("no current user")));
752        };
753
754        let client = self.client.clone();
755        cx.spawn(async move |this, cx| {
756            if let Some(client) = client.upgrade() {
757                let response = client
758                    .request(proto::AcceptTermsOfService {})
759                    .await
760                    .context("error accepting tos")?;
761
762                this.update(cx, |this, cx| {
763                    this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
764                    cx.emit(Event::PrivateUserInfoUpdated);
765                })
766            } else {
767                Err(anyhow!("client not found"))
768            }
769        })
770    }
771
772    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
773        self.accepted_tos_at = Some(
774            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
775        );
776    }
777
778    fn load_users(
779        &self,
780        request: impl RequestMessage<Response = UsersResponse>,
781        cx: &Context<Self>,
782    ) -> Task<Result<Vec<Arc<User>>>> {
783        let client = self.client.clone();
784        cx.spawn(async move |this, cx| {
785            if let Some(rpc) = client.upgrade() {
786                let response = rpc.request(request).await.context("error loading users")?;
787                let users = response.users;
788
789                this.update(cx, |this, _| this.insert(users))
790            } else {
791                Ok(Vec::new())
792            }
793        })
794    }
795
796    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
797        let mut ret = Vec::with_capacity(users.len());
798        for user in users {
799            let user = User::new(user);
800            if let Some(old) = self.users.insert(user.id, user.clone()) {
801                if old.github_login != user.github_login {
802                    self.by_github_login.remove(&old.github_login);
803                }
804            }
805            self.by_github_login
806                .insert(user.github_login.clone(), user.id);
807            ret.push(user)
808        }
809        ret
810    }
811
812    pub fn set_participant_indices(
813        &mut self,
814        participant_indices: HashMap<u64, ParticipantIndex>,
815        cx: &mut Context<Self>,
816    ) {
817        if participant_indices != self.participant_indices {
818            self.participant_indices = participant_indices;
819            cx.emit(Event::ParticipantIndicesChanged);
820        }
821    }
822
823    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
824        &self.participant_indices
825    }
826
827    pub fn participant_names(
828        &self,
829        user_ids: impl Iterator<Item = u64>,
830        cx: &App,
831    ) -> HashMap<u64, SharedString> {
832        let mut ret = HashMap::default();
833        let mut missing_user_ids = Vec::new();
834        for id in user_ids {
835            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
836                ret.insert(id, github_login.into());
837            } else {
838                missing_user_ids.push(id)
839            }
840        }
841        if !missing_user_ids.is_empty() {
842            let this = self.weak_self.clone();
843            cx.spawn(async move |cx| {
844                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
845                    .await
846            })
847            .detach_and_log_err(cx);
848        }
849        ret
850    }
851}
852
853impl User {
854    fn new(message: proto::User) -> Arc<Self> {
855        Arc::new(User {
856            id: message.id,
857            github_login: message.github_login,
858            avatar_uri: message.avatar_url.into(),
859            name: message.name,
860            email: message.email,
861        })
862    }
863}
864
865impl Contact {
866    async fn from_proto(
867        contact: proto::Contact,
868        user_store: &Entity<UserStore>,
869        cx: &mut AsyncApp,
870    ) -> Result<Self> {
871        let user = user_store
872            .update(cx, |user_store, cx| {
873                user_store.get_user(contact.user_id, cx)
874            })?
875            .await?;
876        Ok(Self {
877            user,
878            online: contact.online,
879            busy: contact.busy,
880        })
881    }
882}
883
884impl Collaborator {
885    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
886        Ok(Self {
887            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
888            replica_id: message.replica_id as ReplicaId,
889            user_id: message.user_id as UserId,
890            is_host: message.is_host,
891        })
892    }
893}