user.rs

  1use super::{Client, Status, TypedEnvelope, proto};
  2use anyhow::{Context as _, Result, anyhow};
  3use chrono::{DateTime, Utc};
  4use cloud_llm_client::{
  5    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
  6    MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
  7};
  8use collections::{HashMap, HashSet, hash_map::Entry};
  9use derive_more::Deref;
 10use feature_flags::FeatureFlagAppExt;
 11use futures::{Future, StreamExt, channel::mpsc};
 12use gpui::{
 13    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
 14};
 15use http_client::http::{HeaderMap, HeaderValue};
 16use postage::{sink::Sink, watch};
 17use rpc::proto::{RequestMessage, UsersResponse};
 18use std::{
 19    str::FromStr as _,
 20    sync::{Arc, Weak},
 21};
 22use text::ReplicaId;
 23use util::TryFutureExt as _;
 24
 25pub type UserId = u64;
 26
 27#[derive(
 28    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 29)]
 30pub struct ChannelId(pub u64);
 31
 32impl std::fmt::Display for ChannelId {
 33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 34        self.0.fmt(f)
 35    }
 36}
 37
 38#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 39pub struct ProjectId(pub u64);
 40
 41impl ProjectId {
 42    pub fn to_proto(&self) -> u64 {
 43        self.0
 44    }
 45}
 46
 47#[derive(
 48    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 49)]
 50pub struct DevServerProjectId(pub u64);
 51
 52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 53pub struct ParticipantIndex(pub u32);
 54
 55#[derive(Default, Debug)]
 56pub struct User {
 57    pub id: UserId,
 58    pub github_login: SharedString,
 59    pub avatar_uri: SharedUri,
 60    pub name: Option<String>,
 61}
 62
 63#[derive(Clone, Debug, PartialEq, Eq)]
 64pub struct Collaborator {
 65    pub peer_id: proto::PeerId,
 66    pub replica_id: ReplicaId,
 67    pub user_id: UserId,
 68    pub is_host: bool,
 69    pub committer_name: Option<String>,
 70    pub committer_email: Option<String>,
 71}
 72
 73impl PartialOrd for User {
 74    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 75        Some(self.cmp(other))
 76    }
 77}
 78
 79impl Ord for User {
 80    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 81        self.github_login.cmp(&other.github_login)
 82    }
 83}
 84
 85impl PartialEq for User {
 86    fn eq(&self, other: &Self) -> bool {
 87        self.id == other.id && self.github_login == other.github_login
 88    }
 89}
 90
 91impl Eq for User {}
 92
 93#[derive(Debug, PartialEq)]
 94pub struct Contact {
 95    pub user: Arc<User>,
 96    pub online: bool,
 97    pub busy: bool,
 98}
 99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ContactRequestStatus {
102    None,
103    RequestSent,
104    RequestReceived,
105    RequestAccepted,
106}
107
108pub struct UserStore {
109    users: HashMap<u64, Arc<User>>,
110    by_github_login: HashMap<SharedString, u64>,
111    participant_indices: HashMap<u64, ParticipantIndex>,
112    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
113    current_plan: Option<proto::Plan>,
114    trial_started_at: Option<DateTime<Utc>>,
115    is_usage_based_billing_enabled: Option<bool>,
116    account_too_young: Option<bool>,
117    current_user: watch::Receiver<Option<Arc<User>>>,
118    accepted_tos_at: Option<Option<DateTime<Utc>>>,
119    contacts: Vec<Arc<Contact>>,
120    incoming_contact_requests: Vec<Arc<User>>,
121    outgoing_contact_requests: Vec<Arc<User>>,
122    pending_contact_requests: HashMap<u64, usize>,
123    invite_info: Option<InviteInfo>,
124    client: Weak<Client>,
125    _maintain_contacts: Task<()>,
126    _maintain_current_user: Task<Result<()>>,
127    weak_self: WeakEntity<Self>,
128}
129
130#[derive(Clone)]
131pub struct InviteInfo {
132    pub count: u32,
133    pub url: Arc<str>,
134}
135
136pub enum Event {
137    Contact {
138        user: Arc<User>,
139        kind: ContactEventKind,
140    },
141    ShowContacts,
142    ParticipantIndicesChanged,
143    PrivateUserInfoUpdated,
144    PlanUpdated,
145}
146
147#[derive(Clone, Copy)]
148pub enum ContactEventKind {
149    Requested,
150    Accepted,
151    Cancelled,
152}
153
154impl EventEmitter<Event> for UserStore {}
155
156enum UpdateContacts {
157    Update(proto::UpdateContacts),
158    Wait(postage::barrier::Sender),
159    Clear(postage::barrier::Sender),
160}
161
162#[derive(Debug, Clone, Copy, Deref)]
163pub struct ModelRequestUsage(pub RequestUsage);
164
165#[derive(Debug, Clone, Copy, Deref)]
166pub struct EditPredictionUsage(pub RequestUsage);
167
168#[derive(Debug, Clone, Copy)]
169pub struct RequestUsage {
170    pub limit: UsageLimit,
171    pub amount: i32,
172}
173
174impl UserStore {
175    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
176        let (mut current_user_tx, current_user_rx) = watch::channel();
177        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
178        let rpc_subscriptions = vec![
179            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
180            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
181            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
182            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
183        ];
184        Self {
185            users: Default::default(),
186            by_github_login: Default::default(),
187            current_user: current_user_rx,
188            current_plan: None,
189            trial_started_at: None,
190            is_usage_based_billing_enabled: None,
191            account_too_young: None,
192            accepted_tos_at: None,
193            contacts: Default::default(),
194            incoming_contact_requests: Default::default(),
195            participant_indices: Default::default(),
196            outgoing_contact_requests: Default::default(),
197            invite_info: None,
198            client: Arc::downgrade(&client),
199            update_contacts_tx,
200            _maintain_contacts: cx.spawn(async move |this, cx| {
201                let _subscriptions = rpc_subscriptions;
202                while let Some(message) = update_contacts_rx.next().await {
203                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
204                    {
205                        task.log_err().await;
206                    } else {
207                        break;
208                    }
209                }
210            }),
211            _maintain_current_user: cx.spawn(async move |this, cx| {
212                let mut status = client.status();
213                let weak = Arc::downgrade(&client);
214                drop(client);
215                while let Some(status) = status.next().await {
216                    // if the client is dropped, the app is shutting down.
217                    let Some(client) = weak.upgrade() else {
218                        return Ok(());
219                    };
220                    match status {
221                        Status::Connected { .. } => {
222                            if let Some(user_id) = client.user_id() {
223                                let fetch_user = if let Ok(fetch_user) =
224                                    this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
225                                {
226                                    fetch_user
227                                } else {
228                                    break;
229                                };
230                                let fetch_private_user_info =
231                                    client.request(proto::GetPrivateUserInfo {}).log_err();
232                                let (user, info) =
233                                    futures::join!(fetch_user, fetch_private_user_info);
234
235                                cx.update(|cx| {
236                                    if let Some(info) = info {
237                                        let staff =
238                                            info.staff && !*feature_flags::ZED_DISABLE_STAFF;
239                                        cx.update_flags(staff, info.flags);
240                                        client.telemetry.set_authenticated_user_info(
241                                            Some(info.metrics_id.clone()),
242                                            staff,
243                                        );
244
245                                        this.update(cx, |this, cx| {
246                                            let accepted_tos_at = {
247                                                #[cfg(debug_assertions)]
248                                                if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
249                                                {
250                                                    None
251                                                } else {
252                                                    info.accepted_tos_at
253                                                }
254
255                                                #[cfg(not(debug_assertions))]
256                                                info.accepted_tos_at
257                                            };
258
259                                            this.set_current_user_accepted_tos_at(accepted_tos_at);
260                                            cx.emit(Event::PrivateUserInfoUpdated);
261                                        })
262                                    } else {
263                                        anyhow::Ok(())
264                                    }
265                                })??;
266
267                                current_user_tx.send(user).await.ok();
268
269                                this.update(cx, |_, cx| cx.notify())?;
270                            }
271                        }
272                        Status::SignedOut => {
273                            current_user_tx.send(None).await.ok();
274                            this.update(cx, |this, cx| {
275                                this.accepted_tos_at = None;
276                                cx.emit(Event::PrivateUserInfoUpdated);
277                                cx.notify();
278                                this.clear_contacts()
279                            })?
280                            .await;
281                        }
282                        Status::ConnectionLost => {
283                            this.update(cx, |this, cx| {
284                                cx.notify();
285                                this.clear_contacts()
286                            })?
287                            .await;
288                        }
289                        _ => {}
290                    }
291                }
292                Ok(())
293            }),
294            pending_contact_requests: Default::default(),
295            weak_self: cx.weak_entity(),
296        }
297    }
298
299    #[cfg(feature = "test-support")]
300    pub fn clear_cache(&mut self) {
301        self.users.clear();
302        self.by_github_login.clear();
303    }
304
305    async fn handle_update_invite_info(
306        this: Entity<Self>,
307        message: TypedEnvelope<proto::UpdateInviteInfo>,
308        mut cx: AsyncApp,
309    ) -> Result<()> {
310        this.update(&mut cx, |this, cx| {
311            this.invite_info = Some(InviteInfo {
312                url: Arc::from(message.payload.url),
313                count: message.payload.count,
314            });
315            cx.notify();
316        })?;
317        Ok(())
318    }
319
320    async fn handle_show_contacts(
321        this: Entity<Self>,
322        _: TypedEnvelope<proto::ShowContacts>,
323        mut cx: AsyncApp,
324    ) -> Result<()> {
325        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
326        Ok(())
327    }
328
329    pub fn invite_info(&self) -> Option<&InviteInfo> {
330        self.invite_info.as_ref()
331    }
332
333    async fn handle_update_contacts(
334        this: Entity<Self>,
335        message: TypedEnvelope<proto::UpdateContacts>,
336        mut cx: AsyncApp,
337    ) -> Result<()> {
338        this.read_with(&mut cx, |this, _| {
339            this.update_contacts_tx
340                .unbounded_send(UpdateContacts::Update(message.payload))
341                .unwrap();
342        })?;
343        Ok(())
344    }
345
346    async fn handle_update_plan(
347        this: Entity<Self>,
348        message: TypedEnvelope<proto::UpdateUserPlan>,
349        mut cx: AsyncApp,
350    ) -> Result<()> {
351        this.update(&mut cx, |this, cx| {
352            this.current_plan = Some(message.payload.plan());
353            this.trial_started_at = message
354                .payload
355                .trial_started_at
356                .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
357            this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
358            this.account_too_young = message.payload.account_too_young;
359
360            cx.emit(Event::PlanUpdated);
361            cx.notify();
362        })?;
363        Ok(())
364    }
365
366    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
367        match message {
368            UpdateContacts::Wait(barrier) => {
369                drop(barrier);
370                Task::ready(Ok(()))
371            }
372            UpdateContacts::Clear(barrier) => {
373                self.contacts.clear();
374                self.incoming_contact_requests.clear();
375                self.outgoing_contact_requests.clear();
376                drop(barrier);
377                Task::ready(Ok(()))
378            }
379            UpdateContacts::Update(message) => {
380                let mut user_ids = HashSet::default();
381                for contact in &message.contacts {
382                    user_ids.insert(contact.user_id);
383                }
384                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
385                user_ids.extend(message.outgoing_requests.iter());
386
387                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
388                cx.spawn(async move |this, cx| {
389                    load_users.await?;
390
391                    // Users are fetched in parallel above and cached in call to get_users
392                    // No need to parallelize here
393                    let mut updated_contacts = Vec::new();
394                    let this = this.upgrade().context("can't upgrade user store handle")?;
395                    for contact in message.contacts {
396                        updated_contacts
397                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
398                    }
399
400                    let mut incoming_requests = Vec::new();
401                    for request in message.incoming_requests {
402                        incoming_requests.push({
403                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
404                                .await?
405                        });
406                    }
407
408                    let mut outgoing_requests = Vec::new();
409                    for requested_user_id in message.outgoing_requests {
410                        outgoing_requests.push(
411                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
412                                .await?,
413                        );
414                    }
415
416                    let removed_contacts =
417                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
418                    let removed_incoming_requests =
419                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
420                    let removed_outgoing_requests =
421                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
422
423                    this.update(cx, |this, cx| {
424                        // Remove contacts
425                        this.contacts
426                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
427                        // Update existing contacts and insert new ones
428                        for updated_contact in updated_contacts {
429                            match this.contacts.binary_search_by_key(
430                                &&updated_contact.user.github_login,
431                                |contact| &contact.user.github_login,
432                            ) {
433                                Ok(ix) => this.contacts[ix] = updated_contact,
434                                Err(ix) => this.contacts.insert(ix, updated_contact),
435                            }
436                        }
437
438                        // Remove incoming contact requests
439                        this.incoming_contact_requests.retain(|user| {
440                            if removed_incoming_requests.contains(&user.id) {
441                                cx.emit(Event::Contact {
442                                    user: user.clone(),
443                                    kind: ContactEventKind::Cancelled,
444                                });
445                                false
446                            } else {
447                                true
448                            }
449                        });
450                        // Update existing incoming requests and insert new ones
451                        for user in incoming_requests {
452                            match this
453                                .incoming_contact_requests
454                                .binary_search_by_key(&&user.github_login, |contact| {
455                                    &contact.github_login
456                                }) {
457                                Ok(ix) => this.incoming_contact_requests[ix] = user,
458                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
459                            }
460                        }
461
462                        // Remove outgoing contact requests
463                        this.outgoing_contact_requests
464                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
465                        // Update existing incoming requests and insert new ones
466                        for request in outgoing_requests {
467                            match this
468                                .outgoing_contact_requests
469                                .binary_search_by_key(&&request.github_login, |contact| {
470                                    &contact.github_login
471                                }) {
472                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
473                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
474                            }
475                        }
476
477                        cx.notify();
478                    })?;
479
480                    Ok(())
481                })
482            }
483        }
484    }
485
486    pub fn contacts(&self) -> &[Arc<Contact>] {
487        &self.contacts
488    }
489
490    pub fn has_contact(&self, user: &Arc<User>) -> bool {
491        self.contacts
492            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
493            .is_ok()
494    }
495
496    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
497        &self.incoming_contact_requests
498    }
499
500    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
501        &self.outgoing_contact_requests
502    }
503
504    pub fn is_contact_request_pending(&self, user: &User) -> bool {
505        self.pending_contact_requests.contains_key(&user.id)
506    }
507
508    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
509        if self
510            .contacts
511            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
512            .is_ok()
513        {
514            ContactRequestStatus::RequestAccepted
515        } else if self
516            .outgoing_contact_requests
517            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
518            .is_ok()
519        {
520            ContactRequestStatus::RequestSent
521        } else if self
522            .incoming_contact_requests
523            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
524            .is_ok()
525        {
526            ContactRequestStatus::RequestReceived
527        } else {
528            ContactRequestStatus::None
529        }
530    }
531
532    pub fn request_contact(
533        &mut self,
534        responder_id: u64,
535        cx: &mut Context<Self>,
536    ) -> Task<Result<()>> {
537        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
538    }
539
540    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
541        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
542    }
543
544    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
545        self.incoming_contact_requests
546            .iter()
547            .any(|user| user.id == user_id)
548    }
549
550    pub fn respond_to_contact_request(
551        &mut self,
552        requester_id: u64,
553        accept: bool,
554        cx: &mut Context<Self>,
555    ) -> Task<Result<()>> {
556        self.perform_contact_request(
557            requester_id,
558            proto::RespondToContactRequest {
559                requester_id,
560                response: if accept {
561                    proto::ContactRequestResponse::Accept
562                } else {
563                    proto::ContactRequestResponse::Decline
564                } as i32,
565            },
566            cx,
567        )
568    }
569
570    pub fn dismiss_contact_request(
571        &self,
572        requester_id: u64,
573        cx: &Context<Self>,
574    ) -> Task<Result<()>> {
575        let client = self.client.upgrade();
576        cx.spawn(async move |_, _| {
577            client
578                .context("can't upgrade client reference")?
579                .request(proto::RespondToContactRequest {
580                    requester_id,
581                    response: proto::ContactRequestResponse::Dismiss as i32,
582                })
583                .await?;
584            Ok(())
585        })
586    }
587
588    fn perform_contact_request<T: RequestMessage>(
589        &mut self,
590        user_id: u64,
591        request: T,
592        cx: &mut Context<Self>,
593    ) -> Task<Result<()>> {
594        let client = self.client.upgrade();
595        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
596        cx.notify();
597
598        cx.spawn(async move |this, cx| {
599            let response = client
600                .context("can't upgrade client reference")?
601                .request(request)
602                .await;
603            this.update(cx, |this, cx| {
604                if let Entry::Occupied(mut request_count) =
605                    this.pending_contact_requests.entry(user_id)
606                {
607                    *request_count.get_mut() -= 1;
608                    if *request_count.get() == 0 {
609                        request_count.remove();
610                    }
611                }
612                cx.notify();
613            })?;
614            response?;
615            Ok(())
616        })
617    }
618
619    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
620        let (tx, mut rx) = postage::barrier::channel();
621        self.update_contacts_tx
622            .unbounded_send(UpdateContacts::Clear(tx))
623            .unwrap();
624        async move {
625            rx.next().await;
626        }
627    }
628
629    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
630        let (tx, mut rx) = postage::barrier::channel();
631        self.update_contacts_tx
632            .unbounded_send(UpdateContacts::Wait(tx))
633            .unwrap();
634        async move {
635            rx.next().await;
636        }
637    }
638
639    pub fn get_users(
640        &self,
641        user_ids: Vec<u64>,
642        cx: &Context<Self>,
643    ) -> Task<Result<Vec<Arc<User>>>> {
644        let mut user_ids_to_fetch = user_ids.clone();
645        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
646
647        cx.spawn(async move |this, cx| {
648            if !user_ids_to_fetch.is_empty() {
649                this.update(cx, |this, cx| {
650                    this.load_users(
651                        proto::GetUsers {
652                            user_ids: user_ids_to_fetch,
653                        },
654                        cx,
655                    )
656                })?
657                .await?;
658            }
659
660            this.read_with(cx, |this, _| {
661                user_ids
662                    .iter()
663                    .map(|user_id| {
664                        this.users
665                            .get(user_id)
666                            .cloned()
667                            .with_context(|| format!("user {user_id} not found"))
668                    })
669                    .collect()
670            })?
671        })
672    }
673
674    pub fn fuzzy_search_users(
675        &self,
676        query: String,
677        cx: &Context<Self>,
678    ) -> Task<Result<Vec<Arc<User>>>> {
679        self.load_users(proto::FuzzySearchUsers { query }, cx)
680    }
681
682    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
683        self.users.get(&user_id).cloned()
684    }
685
686    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
687        if let Some(user) = self.users.get(&user_id).cloned() {
688            return Some(user);
689        }
690
691        self.get_user(user_id, cx).detach_and_log_err(cx);
692        None
693    }
694
695    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
696        if let Some(user) = self.users.get(&user_id).cloned() {
697            return Task::ready(Ok(user));
698        }
699
700        let load_users = self.get_users(vec![user_id], cx);
701        cx.spawn(async move |this, cx| {
702            load_users.await?;
703            this.read_with(cx, |this, _| {
704                this.users
705                    .get(&user_id)
706                    .cloned()
707                    .context("server responded with no users")
708            })?
709        })
710    }
711
712    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
713        self.by_github_login
714            .get(github_login)
715            .and_then(|id| self.users.get(id).cloned())
716    }
717
718    pub fn current_user(&self) -> Option<Arc<User>> {
719        self.current_user.borrow().clone()
720    }
721
722    pub fn current_plan(&self) -> Option<proto::Plan> {
723        #[cfg(debug_assertions)]
724        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
725            return match plan.as_str() {
726                "free" => Some(proto::Plan::Free),
727                "trial" => Some(proto::Plan::ZedProTrial),
728                "pro" => Some(proto::Plan::ZedPro),
729                _ => {
730                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
731                }
732            };
733        }
734
735        self.current_plan
736    }
737
738    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
739        self.trial_started_at
740    }
741
742    pub fn usage_based_billing_enabled(&self) -> Option<bool> {
743        self.is_usage_based_billing_enabled
744    }
745
746    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
747        self.current_user.clone()
748    }
749
750    /// Returns whether the user's account is too new to use the service.
751    pub fn account_too_young(&self) -> bool {
752        self.account_too_young.unwrap_or(false)
753    }
754
755    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
756        self.accepted_tos_at
757            .map(|accepted_tos_at| accepted_tos_at.is_some())
758    }
759
760    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
761        if self.current_user().is_none() {
762            return Task::ready(Err(anyhow!("no current user")));
763        };
764
765        let client = self.client.clone();
766        cx.spawn(async move |this, cx| -> anyhow::Result<()> {
767            let client = client.upgrade().context("client not found")?;
768            let response = client
769                .request(proto::AcceptTermsOfService {})
770                .await
771                .context("error accepting tos")?;
772            this.update(cx, |this, cx| {
773                this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
774                cx.emit(Event::PrivateUserInfoUpdated);
775            })?;
776            Ok(())
777        })
778    }
779
780    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
781        self.accepted_tos_at = Some(
782            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
783        );
784    }
785
786    fn load_users(
787        &self,
788        request: impl RequestMessage<Response = UsersResponse>,
789        cx: &Context<Self>,
790    ) -> Task<Result<Vec<Arc<User>>>> {
791        let client = self.client.clone();
792        cx.spawn(async move |this, cx| {
793            if let Some(rpc) = client.upgrade() {
794                let response = rpc.request(request).await.context("error loading users")?;
795                let users = response.users;
796
797                this.update(cx, |this, _| this.insert(users))
798            } else {
799                Ok(Vec::new())
800            }
801        })
802    }
803
804    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
805        let mut ret = Vec::with_capacity(users.len());
806        for user in users {
807            let user = User::new(user);
808            if let Some(old) = self.users.insert(user.id, user.clone()) {
809                if old.github_login != user.github_login {
810                    self.by_github_login.remove(&old.github_login);
811                }
812            }
813            self.by_github_login
814                .insert(user.github_login.clone(), user.id);
815            ret.push(user)
816        }
817        ret
818    }
819
820    pub fn set_participant_indices(
821        &mut self,
822        participant_indices: HashMap<u64, ParticipantIndex>,
823        cx: &mut Context<Self>,
824    ) {
825        if participant_indices != self.participant_indices {
826            self.participant_indices = participant_indices;
827            cx.emit(Event::ParticipantIndicesChanged);
828        }
829    }
830
831    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
832        &self.participant_indices
833    }
834
835    pub fn participant_names(
836        &self,
837        user_ids: impl Iterator<Item = u64>,
838        cx: &App,
839    ) -> HashMap<u64, SharedString> {
840        let mut ret = HashMap::default();
841        let mut missing_user_ids = Vec::new();
842        for id in user_ids {
843            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
844                ret.insert(id, github_login);
845            } else {
846                missing_user_ids.push(id)
847            }
848        }
849        if !missing_user_ids.is_empty() {
850            let this = self.weak_self.clone();
851            cx.spawn(async move |cx| {
852                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
853                    .await
854            })
855            .detach_and_log_err(cx);
856        }
857        ret
858    }
859}
860
861impl User {
862    fn new(message: proto::User) -> Arc<Self> {
863        Arc::new(User {
864            id: message.id,
865            github_login: message.github_login.into(),
866            avatar_uri: message.avatar_url.into(),
867            name: message.name,
868        })
869    }
870}
871
872impl Contact {
873    async fn from_proto(
874        contact: proto::Contact,
875        user_store: &Entity<UserStore>,
876        cx: &mut AsyncApp,
877    ) -> Result<Self> {
878        let user = user_store
879            .update(cx, |user_store, cx| {
880                user_store.get_user(contact.user_id, cx)
881            })?
882            .await?;
883        Ok(Self {
884            user,
885            online: contact.online,
886            busy: contact.busy,
887        })
888    }
889}
890
891impl Collaborator {
892    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
893        Ok(Self {
894            peer_id: message.peer_id.context("invalid peer id")?,
895            replica_id: message.replica_id as ReplicaId,
896            user_id: message.user_id as UserId,
897            is_host: message.is_host,
898            committer_name: message.committer_name,
899            committer_email: message.committer_email,
900        })
901    }
902}
903
904impl RequestUsage {
905    pub fn over_limit(&self) -> bool {
906        match self.limit {
907            UsageLimit::Limited(limit) => self.amount >= limit,
908            UsageLimit::Unlimited => false,
909        }
910    }
911
912    pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
913        let limit = match limit.variant? {
914            proto::usage_limit::Variant::Limited(limited) => {
915                UsageLimit::Limited(limited.limit as i32)
916            }
917            proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
918        };
919        Some(RequestUsage {
920            limit,
921            amount: amount as i32,
922        })
923    }
924
925    fn from_headers(
926        limit_name: &str,
927        amount_name: &str,
928        headers: &HeaderMap<HeaderValue>,
929    ) -> Result<Self> {
930        let limit = headers
931            .get(limit_name)
932            .with_context(|| format!("missing {limit_name:?} header"))?;
933        let limit = UsageLimit::from_str(limit.to_str()?)?;
934
935        let amount = headers
936            .get(amount_name)
937            .with_context(|| format!("missing {amount_name:?} header"))?;
938        let amount = amount.to_str()?.parse::<i32>()?;
939
940        Ok(Self { limit, amount })
941    }
942}
943
944impl ModelRequestUsage {
945    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
946        Ok(Self(RequestUsage::from_headers(
947            MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
948            MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
949            headers,
950        )?))
951    }
952}
953
954impl EditPredictionUsage {
955    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
956        Ok(Self(RequestUsage::from_headers(
957            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
958            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
959            headers,
960        )?))
961    }
962}