user.rs

  1use super::{Client, Status, TypedEnvelope, proto};
  2use anyhow::{Context as _, Result, anyhow};
  3use chrono::{DateTime, Utc};
  4use collections::{HashMap, HashSet, hash_map::Entry};
  5use feature_flags::FeatureFlagAppExt;
  6use futures::{Future, StreamExt, channel::mpsc};
  7use gpui::{
  8    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  9};
 10use postage::{sink::Sink, watch};
 11use rpc::proto::{RequestMessage, UsersResponse};
 12use std::sync::{Arc, Weak};
 13use text::ReplicaId;
 14use util::{TryFutureExt as _, maybe};
 15
 16pub type UserId = u64;
 17
 18#[derive(
 19    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 20)]
 21pub struct ChannelId(pub u64);
 22
 23impl std::fmt::Display for ChannelId {
 24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 25        self.0.fmt(f)
 26    }
 27}
 28
 29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 30pub struct ProjectId(pub u64);
 31
 32impl ProjectId {
 33    pub fn to_proto(&self) -> u64 {
 34        self.0
 35    }
 36}
 37
 38#[derive(
 39    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 40)]
 41pub struct DevServerProjectId(pub u64);
 42
 43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 44pub struct ParticipantIndex(pub u32);
 45
 46#[derive(Default, Debug)]
 47pub struct User {
 48    pub id: UserId,
 49    pub github_login: String,
 50    pub avatar_uri: SharedUri,
 51    pub name: Option<String>,
 52    pub email: Option<String>,
 53}
 54
 55#[derive(Clone, Debug, PartialEq, Eq)]
 56pub struct Collaborator {
 57    pub peer_id: proto::PeerId,
 58    pub replica_id: ReplicaId,
 59    pub user_id: UserId,
 60    pub is_host: bool,
 61}
 62
 63impl PartialOrd for User {
 64    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 65        Some(self.cmp(other))
 66    }
 67}
 68
 69impl Ord for User {
 70    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 71        self.github_login.cmp(&other.github_login)
 72    }
 73}
 74
 75impl PartialEq for User {
 76    fn eq(&self, other: &Self) -> bool {
 77        self.id == other.id && self.github_login == other.github_login
 78    }
 79}
 80
 81impl Eq for User {}
 82
 83#[derive(Debug, PartialEq)]
 84pub struct Contact {
 85    pub user: Arc<User>,
 86    pub online: bool,
 87    pub busy: bool,
 88}
 89
 90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 91pub enum ContactRequestStatus {
 92    None,
 93    RequestSent,
 94    RequestReceived,
 95    RequestAccepted,
 96}
 97
 98pub struct UserStore {
 99    users: HashMap<u64, Arc<User>>,
100    by_github_login: HashMap<String, u64>,
101    participant_indices: HashMap<u64, ParticipantIndex>,
102    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
103    current_plan: Option<proto::Plan>,
104    subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
105    trial_started_at: Option<DateTime<Utc>>,
106    model_request_usage_amount: Option<u32>,
107    model_request_usage_limit: Option<proto::UsageLimit>,
108    edit_predictions_usage_amount: Option<u32>,
109    edit_predictions_usage_limit: Option<proto::UsageLimit>,
110    is_usage_based_billing_enabled: Option<bool>,
111    current_user: watch::Receiver<Option<Arc<User>>>,
112    accepted_tos_at: Option<Option<DateTime<Utc>>>,
113    contacts: Vec<Arc<Contact>>,
114    incoming_contact_requests: Vec<Arc<User>>,
115    outgoing_contact_requests: Vec<Arc<User>>,
116    pending_contact_requests: HashMap<u64, usize>,
117    invite_info: Option<InviteInfo>,
118    client: Weak<Client>,
119    _maintain_contacts: Task<()>,
120    _maintain_current_user: Task<Result<()>>,
121    weak_self: WeakEntity<Self>,
122}
123
124#[derive(Clone)]
125pub struct InviteInfo {
126    pub count: u32,
127    pub url: Arc<str>,
128}
129
130pub enum Event {
131    Contact {
132        user: Arc<User>,
133        kind: ContactEventKind,
134    },
135    ShowContacts,
136    ParticipantIndicesChanged,
137    PrivateUserInfoUpdated,
138}
139
140#[derive(Clone, Copy)]
141pub enum ContactEventKind {
142    Requested,
143    Accepted,
144    Cancelled,
145}
146
147impl EventEmitter<Event> for UserStore {}
148
149enum UpdateContacts {
150    Update(proto::UpdateContacts),
151    Wait(postage::barrier::Sender),
152    Clear(postage::barrier::Sender),
153}
154
155impl UserStore {
156    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
157        let (mut current_user_tx, current_user_rx) = watch::channel();
158        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
159        let rpc_subscriptions = vec![
160            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
161            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
162            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
163            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
164        ];
165        Self {
166            users: Default::default(),
167            by_github_login: Default::default(),
168            current_user: current_user_rx,
169            current_plan: None,
170            subscription_period: None,
171            trial_started_at: None,
172            model_request_usage_amount: None,
173            model_request_usage_limit: None,
174            edit_predictions_usage_amount: None,
175            edit_predictions_usage_limit: None,
176            is_usage_based_billing_enabled: None,
177            accepted_tos_at: None,
178            contacts: Default::default(),
179            incoming_contact_requests: Default::default(),
180            participant_indices: Default::default(),
181            outgoing_contact_requests: Default::default(),
182            invite_info: None,
183            client: Arc::downgrade(&client),
184            update_contacts_tx,
185            _maintain_contacts: cx.spawn(async move |this, cx| {
186                let _subscriptions = rpc_subscriptions;
187                while let Some(message) = update_contacts_rx.next().await {
188                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
189                    {
190                        task.log_err().await;
191                    } else {
192                        break;
193                    }
194                }
195            }),
196            _maintain_current_user: cx.spawn(async move |this, cx| {
197                let mut status = client.status();
198                let weak = Arc::downgrade(&client);
199                drop(client);
200                while let Some(status) = status.next().await {
201                    // if the client is dropped, the app is shutting down.
202                    let Some(client) = weak.upgrade() else {
203                        return Ok(());
204                    };
205                    match status {
206                        Status::Connected { .. } => {
207                            if let Some(user_id) = client.user_id() {
208                                let fetch_user = if let Ok(fetch_user) =
209                                    this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
210                                {
211                                    fetch_user
212                                } else {
213                                    break;
214                                };
215                                let fetch_private_user_info =
216                                    client.request(proto::GetPrivateUserInfo {}).log_err();
217                                let (user, info) =
218                                    futures::join!(fetch_user, fetch_private_user_info);
219
220                                cx.update(|cx| {
221                                    if let Some(info) = info {
222                                        let staff =
223                                            info.staff && !*feature_flags::ZED_DISABLE_STAFF;
224                                        cx.update_flags(staff, info.flags);
225                                        client.telemetry.set_authenticated_user_info(
226                                            Some(info.metrics_id.clone()),
227                                            staff,
228                                        );
229
230                                        this.update(cx, |this, cx| {
231                                            let accepted_tos_at = {
232                                                #[cfg(debug_assertions)]
233                                                if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
234                                                {
235                                                    None
236                                                } else {
237                                                    info.accepted_tos_at
238                                                }
239
240                                                #[cfg(not(debug_assertions))]
241                                                info.accepted_tos_at
242                                            };
243
244                                            this.set_current_user_accepted_tos_at(accepted_tos_at);
245                                            cx.emit(Event::PrivateUserInfoUpdated);
246                                        })
247                                    } else {
248                                        anyhow::Ok(())
249                                    }
250                                })??;
251
252                                current_user_tx.send(user).await.ok();
253
254                                this.update(cx, |_, cx| cx.notify())?;
255                            }
256                        }
257                        Status::SignedOut => {
258                            current_user_tx.send(None).await.ok();
259                            this.update(cx, |this, cx| {
260                                this.accepted_tos_at = None;
261                                cx.emit(Event::PrivateUserInfoUpdated);
262                                cx.notify();
263                                this.clear_contacts()
264                            })?
265                            .await;
266                        }
267                        Status::ConnectionLost => {
268                            this.update(cx, |this, cx| {
269                                cx.notify();
270                                this.clear_contacts()
271                            })?
272                            .await;
273                        }
274                        _ => {}
275                    }
276                }
277                Ok(())
278            }),
279            pending_contact_requests: Default::default(),
280            weak_self: cx.weak_entity(),
281        }
282    }
283
284    #[cfg(feature = "test-support")]
285    pub fn clear_cache(&mut self) {
286        self.users.clear();
287        self.by_github_login.clear();
288    }
289
290    async fn handle_update_invite_info(
291        this: Entity<Self>,
292        message: TypedEnvelope<proto::UpdateInviteInfo>,
293        mut cx: AsyncApp,
294    ) -> Result<()> {
295        this.update(&mut cx, |this, cx| {
296            this.invite_info = Some(InviteInfo {
297                url: Arc::from(message.payload.url),
298                count: message.payload.count,
299            });
300            cx.notify();
301        })?;
302        Ok(())
303    }
304
305    async fn handle_show_contacts(
306        this: Entity<Self>,
307        _: TypedEnvelope<proto::ShowContacts>,
308        mut cx: AsyncApp,
309    ) -> Result<()> {
310        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
311        Ok(())
312    }
313
314    pub fn invite_info(&self) -> Option<&InviteInfo> {
315        self.invite_info.as_ref()
316    }
317
318    async fn handle_update_contacts(
319        this: Entity<Self>,
320        message: TypedEnvelope<proto::UpdateContacts>,
321        mut cx: AsyncApp,
322    ) -> Result<()> {
323        this.update(&mut cx, |this, _| {
324            this.update_contacts_tx
325                .unbounded_send(UpdateContacts::Update(message.payload))
326                .unwrap();
327        })?;
328        Ok(())
329    }
330
331    async fn handle_update_plan(
332        this: Entity<Self>,
333        message: TypedEnvelope<proto::UpdateUserPlan>,
334        mut cx: AsyncApp,
335    ) -> Result<()> {
336        this.update(&mut cx, |this, cx| {
337            this.current_plan = Some(message.payload.plan());
338            this.subscription_period = maybe!({
339                let period = message.payload.subscription_period?;
340                let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
341                let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
342
343                Some((started_at, ended_at))
344            });
345            this.trial_started_at = message
346                .payload
347                .trial_started_at
348                .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
349            this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
350
351            if let Some(usage) = message.payload.usage {
352                this.model_request_usage_amount = Some(usage.model_requests_usage_amount);
353                this.model_request_usage_limit = usage.model_requests_usage_limit;
354                this.edit_predictions_usage_amount = Some(usage.edit_predictions_usage_amount);
355                this.edit_predictions_usage_limit = usage.edit_predictions_usage_limit;
356            }
357
358            cx.notify();
359        })?;
360        Ok(())
361    }
362
363    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
364        match message {
365            UpdateContacts::Wait(barrier) => {
366                drop(barrier);
367                Task::ready(Ok(()))
368            }
369            UpdateContacts::Clear(barrier) => {
370                self.contacts.clear();
371                self.incoming_contact_requests.clear();
372                self.outgoing_contact_requests.clear();
373                drop(barrier);
374                Task::ready(Ok(()))
375            }
376            UpdateContacts::Update(message) => {
377                let mut user_ids = HashSet::default();
378                for contact in &message.contacts {
379                    user_ids.insert(contact.user_id);
380                }
381                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
382                user_ids.extend(message.outgoing_requests.iter());
383
384                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
385                cx.spawn(async move |this, cx| {
386                    load_users.await?;
387
388                    // Users are fetched in parallel above and cached in call to get_users
389                    // No need to parallelize here
390                    let mut updated_contacts = Vec::new();
391                    let this = this.upgrade().context("can't upgrade user store handle")?;
392                    for contact in message.contacts {
393                        updated_contacts
394                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
395                    }
396
397                    let mut incoming_requests = Vec::new();
398                    for request in message.incoming_requests {
399                        incoming_requests.push({
400                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
401                                .await?
402                        });
403                    }
404
405                    let mut outgoing_requests = Vec::new();
406                    for requested_user_id in message.outgoing_requests {
407                        outgoing_requests.push(
408                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
409                                .await?,
410                        );
411                    }
412
413                    let removed_contacts =
414                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
415                    let removed_incoming_requests =
416                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
417                    let removed_outgoing_requests =
418                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
419
420                    this.update(cx, |this, cx| {
421                        // Remove contacts
422                        this.contacts
423                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
424                        // Update existing contacts and insert new ones
425                        for updated_contact in updated_contacts {
426                            match this.contacts.binary_search_by_key(
427                                &&updated_contact.user.github_login,
428                                |contact| &contact.user.github_login,
429                            ) {
430                                Ok(ix) => this.contacts[ix] = updated_contact,
431                                Err(ix) => this.contacts.insert(ix, updated_contact),
432                            }
433                        }
434
435                        // Remove incoming contact requests
436                        this.incoming_contact_requests.retain(|user| {
437                            if removed_incoming_requests.contains(&user.id) {
438                                cx.emit(Event::Contact {
439                                    user: user.clone(),
440                                    kind: ContactEventKind::Cancelled,
441                                });
442                                false
443                            } else {
444                                true
445                            }
446                        });
447                        // Update existing incoming requests and insert new ones
448                        for user in incoming_requests {
449                            match this
450                                .incoming_contact_requests
451                                .binary_search_by_key(&&user.github_login, |contact| {
452                                    &contact.github_login
453                                }) {
454                                Ok(ix) => this.incoming_contact_requests[ix] = user,
455                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
456                            }
457                        }
458
459                        // Remove outgoing contact requests
460                        this.outgoing_contact_requests
461                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
462                        // Update existing incoming requests and insert new ones
463                        for request in outgoing_requests {
464                            match this
465                                .outgoing_contact_requests
466                                .binary_search_by_key(&&request.github_login, |contact| {
467                                    &contact.github_login
468                                }) {
469                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
470                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
471                            }
472                        }
473
474                        cx.notify();
475                    })?;
476
477                    Ok(())
478                })
479            }
480        }
481    }
482
483    pub fn contacts(&self) -> &[Arc<Contact>] {
484        &self.contacts
485    }
486
487    pub fn has_contact(&self, user: &Arc<User>) -> bool {
488        self.contacts
489            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
490            .is_ok()
491    }
492
493    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
494        &self.incoming_contact_requests
495    }
496
497    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
498        &self.outgoing_contact_requests
499    }
500
501    pub fn is_contact_request_pending(&self, user: &User) -> bool {
502        self.pending_contact_requests.contains_key(&user.id)
503    }
504
505    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
506        if self
507            .contacts
508            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
509            .is_ok()
510        {
511            ContactRequestStatus::RequestAccepted
512        } else if self
513            .outgoing_contact_requests
514            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
515            .is_ok()
516        {
517            ContactRequestStatus::RequestSent
518        } else if self
519            .incoming_contact_requests
520            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
521            .is_ok()
522        {
523            ContactRequestStatus::RequestReceived
524        } else {
525            ContactRequestStatus::None
526        }
527    }
528
529    pub fn request_contact(
530        &mut self,
531        responder_id: u64,
532        cx: &mut Context<Self>,
533    ) -> Task<Result<()>> {
534        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
535    }
536
537    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
538        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
539    }
540
541    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
542        self.incoming_contact_requests
543            .iter()
544            .any(|user| user.id == user_id)
545    }
546
547    pub fn respond_to_contact_request(
548        &mut self,
549        requester_id: u64,
550        accept: bool,
551        cx: &mut Context<Self>,
552    ) -> Task<Result<()>> {
553        self.perform_contact_request(
554            requester_id,
555            proto::RespondToContactRequest {
556                requester_id,
557                response: if accept {
558                    proto::ContactRequestResponse::Accept
559                } else {
560                    proto::ContactRequestResponse::Decline
561                } as i32,
562            },
563            cx,
564        )
565    }
566
567    pub fn dismiss_contact_request(
568        &self,
569        requester_id: u64,
570        cx: &Context<Self>,
571    ) -> Task<Result<()>> {
572        let client = self.client.upgrade();
573        cx.spawn(async move |_, _| {
574            client
575                .context("can't upgrade client reference")?
576                .request(proto::RespondToContactRequest {
577                    requester_id,
578                    response: proto::ContactRequestResponse::Dismiss as i32,
579                })
580                .await?;
581            Ok(())
582        })
583    }
584
585    fn perform_contact_request<T: RequestMessage>(
586        &mut self,
587        user_id: u64,
588        request: T,
589        cx: &mut Context<Self>,
590    ) -> Task<Result<()>> {
591        let client = self.client.upgrade();
592        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
593        cx.notify();
594
595        cx.spawn(async move |this, cx| {
596            let response = client
597                .context("can't upgrade client reference")?
598                .request(request)
599                .await;
600            this.update(cx, |this, cx| {
601                if let Entry::Occupied(mut request_count) =
602                    this.pending_contact_requests.entry(user_id)
603                {
604                    *request_count.get_mut() -= 1;
605                    if *request_count.get() == 0 {
606                        request_count.remove();
607                    }
608                }
609                cx.notify();
610            })?;
611            response?;
612            Ok(())
613        })
614    }
615
616    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
617        let (tx, mut rx) = postage::barrier::channel();
618        self.update_contacts_tx
619            .unbounded_send(UpdateContacts::Clear(tx))
620            .unwrap();
621        async move {
622            rx.next().await;
623        }
624    }
625
626    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
627        let (tx, mut rx) = postage::barrier::channel();
628        self.update_contacts_tx
629            .unbounded_send(UpdateContacts::Wait(tx))
630            .unwrap();
631        async move {
632            rx.next().await;
633        }
634    }
635
636    pub fn get_users(
637        &self,
638        user_ids: Vec<u64>,
639        cx: &Context<Self>,
640    ) -> Task<Result<Vec<Arc<User>>>> {
641        let mut user_ids_to_fetch = user_ids.clone();
642        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
643
644        cx.spawn(async move |this, cx| {
645            if !user_ids_to_fetch.is_empty() {
646                this.update(cx, |this, cx| {
647                    this.load_users(
648                        proto::GetUsers {
649                            user_ids: user_ids_to_fetch,
650                        },
651                        cx,
652                    )
653                })?
654                .await?;
655            }
656
657            this.update(cx, |this, _| {
658                user_ids
659                    .iter()
660                    .map(|user_id| {
661                        this.users
662                            .get(user_id)
663                            .cloned()
664                            .with_context(|| format!("user {user_id} not found"))
665                    })
666                    .collect()
667            })?
668        })
669    }
670
671    pub fn fuzzy_search_users(
672        &self,
673        query: String,
674        cx: &Context<Self>,
675    ) -> Task<Result<Vec<Arc<User>>>> {
676        self.load_users(proto::FuzzySearchUsers { query }, cx)
677    }
678
679    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
680        self.users.get(&user_id).cloned()
681    }
682
683    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
684        if let Some(user) = self.users.get(&user_id).cloned() {
685            return Some(user);
686        }
687
688        self.get_user(user_id, cx).detach_and_log_err(cx);
689        None
690    }
691
692    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
693        if let Some(user) = self.users.get(&user_id).cloned() {
694            return Task::ready(Ok(user));
695        }
696
697        let load_users = self.get_users(vec![user_id], cx);
698        cx.spawn(async move |this, cx| {
699            load_users.await?;
700            this.update(cx, |this, _| {
701                this.users
702                    .get(&user_id)
703                    .cloned()
704                    .context("server responded with no users")
705            })?
706        })
707    }
708
709    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
710        self.by_github_login
711            .get(github_login)
712            .and_then(|id| self.users.get(id).cloned())
713    }
714
715    pub fn current_user(&self) -> Option<Arc<User>> {
716        self.current_user.borrow().clone()
717    }
718
719    pub fn current_plan(&self) -> Option<proto::Plan> {
720        self.current_plan
721    }
722
723    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
724        self.subscription_period
725    }
726
727    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
728        self.trial_started_at
729    }
730
731    pub fn usage_based_billing_enabled(&self) -> Option<bool> {
732        self.is_usage_based_billing_enabled
733    }
734
735    pub fn model_request_usage_amount(&self) -> Option<u32> {
736        self.model_request_usage_amount
737    }
738
739    pub fn model_request_usage_limit(&self) -> Option<proto::UsageLimit> {
740        self.model_request_usage_limit.clone()
741    }
742
743    pub fn edit_predictions_usage_amount(&self) -> Option<u32> {
744        self.edit_predictions_usage_amount
745    }
746
747    pub fn edit_predictions_usage_limit(&self) -> Option<proto::UsageLimit> {
748        self.edit_predictions_usage_limit.clone()
749    }
750
751    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
752        self.current_user.clone()
753    }
754
755    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
756        self.accepted_tos_at
757            .map(|accepted_tos_at| accepted_tos_at.is_some())
758    }
759
760    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
761        if self.current_user().is_none() {
762            return Task::ready(Err(anyhow!("no current user")));
763        };
764
765        let client = self.client.clone();
766        cx.spawn(async move |this, cx| -> anyhow::Result<()> {
767            let client = client.upgrade().context("client not found")?;
768            let response = client
769                .request(proto::AcceptTermsOfService {})
770                .await
771                .context("error accepting tos")?;
772            this.update(cx, |this, cx| {
773                this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
774                cx.emit(Event::PrivateUserInfoUpdated);
775            })?;
776            Ok(())
777        })
778    }
779
780    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
781        self.accepted_tos_at = Some(
782            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
783        );
784    }
785
786    fn load_users(
787        &self,
788        request: impl RequestMessage<Response = UsersResponse>,
789        cx: &Context<Self>,
790    ) -> Task<Result<Vec<Arc<User>>>> {
791        let client = self.client.clone();
792        cx.spawn(async move |this, cx| {
793            if let Some(rpc) = client.upgrade() {
794                let response = rpc.request(request).await.context("error loading users")?;
795                let users = response.users;
796
797                this.update(cx, |this, _| this.insert(users))
798            } else {
799                Ok(Vec::new())
800            }
801        })
802    }
803
804    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
805        let mut ret = Vec::with_capacity(users.len());
806        for user in users {
807            let user = User::new(user);
808            if let Some(old) = self.users.insert(user.id, user.clone()) {
809                if old.github_login != user.github_login {
810                    self.by_github_login.remove(&old.github_login);
811                }
812            }
813            self.by_github_login
814                .insert(user.github_login.clone(), user.id);
815            ret.push(user)
816        }
817        ret
818    }
819
820    pub fn set_participant_indices(
821        &mut self,
822        participant_indices: HashMap<u64, ParticipantIndex>,
823        cx: &mut Context<Self>,
824    ) {
825        if participant_indices != self.participant_indices {
826            self.participant_indices = participant_indices;
827            cx.emit(Event::ParticipantIndicesChanged);
828        }
829    }
830
831    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
832        &self.participant_indices
833    }
834
835    pub fn participant_names(
836        &self,
837        user_ids: impl Iterator<Item = u64>,
838        cx: &App,
839    ) -> HashMap<u64, SharedString> {
840        let mut ret = HashMap::default();
841        let mut missing_user_ids = Vec::new();
842        for id in user_ids {
843            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
844                ret.insert(id, github_login.into());
845            } else {
846                missing_user_ids.push(id)
847            }
848        }
849        if !missing_user_ids.is_empty() {
850            let this = self.weak_self.clone();
851            cx.spawn(async move |cx| {
852                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
853                    .await
854            })
855            .detach_and_log_err(cx);
856        }
857        ret
858    }
859}
860
861impl User {
862    fn new(message: proto::User) -> Arc<Self> {
863        Arc::new(User {
864            id: message.id,
865            github_login: message.github_login,
866            avatar_uri: message.avatar_url.into(),
867            name: message.name,
868            email: message.email,
869        })
870    }
871}
872
873impl Contact {
874    async fn from_proto(
875        contact: proto::Contact,
876        user_store: &Entity<UserStore>,
877        cx: &mut AsyncApp,
878    ) -> Result<Self> {
879        let user = user_store
880            .update(cx, |user_store, cx| {
881                user_store.get_user(contact.user_id, cx)
882            })?
883            .await?;
884        Ok(Self {
885            user,
886            online: contact.online,
887            busy: contact.busy,
888        })
889    }
890}
891
892impl Collaborator {
893    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
894        Ok(Self {
895            peer_id: message.peer_id.context("invalid peer id")?,
896            replica_id: message.replica_id as ReplicaId,
897            user_id: message.user_id as UserId,
898            is_host: message.is_host,
899        })
900    }
901}