user.rs

  1use super::{Client, Status, TypedEnvelope, proto};
  2use anyhow::{Context as _, Result, anyhow};
  3use chrono::{DateTime, Utc};
  4use cloud_llm_client::{
  5    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
  6    MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
  7};
  8use collections::{HashMap, HashSet, hash_map::Entry};
  9use derive_more::Deref;
 10use feature_flags::FeatureFlagAppExt;
 11use futures::{Future, StreamExt, channel::mpsc};
 12use gpui::{
 13    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
 14};
 15use http_client::http::{HeaderMap, HeaderValue};
 16use postage::{sink::Sink, watch};
 17use rpc::proto::{RequestMessage, UsersResponse};
 18use std::{
 19    str::FromStr as _,
 20    sync::{Arc, Weak},
 21};
 22use text::ReplicaId;
 23use util::{TryFutureExt as _, maybe};
 24
 25pub type UserId = u64;
 26
 27#[derive(
 28    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 29)]
 30pub struct ChannelId(pub u64);
 31
 32impl std::fmt::Display for ChannelId {
 33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 34        self.0.fmt(f)
 35    }
 36}
 37
 38#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 39pub struct ProjectId(pub u64);
 40
 41impl ProjectId {
 42    pub fn to_proto(&self) -> u64 {
 43        self.0
 44    }
 45}
 46
 47#[derive(
 48    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 49)]
 50pub struct DevServerProjectId(pub u64);
 51
 52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 53pub struct ParticipantIndex(pub u32);
 54
 55#[derive(Default, Debug)]
 56pub struct User {
 57    pub id: UserId,
 58    pub github_login: SharedString,
 59    pub avatar_uri: SharedUri,
 60    pub name: Option<String>,
 61}
 62
 63#[derive(Clone, Debug, PartialEq, Eq)]
 64pub struct Collaborator {
 65    pub peer_id: proto::PeerId,
 66    pub replica_id: ReplicaId,
 67    pub user_id: UserId,
 68    pub is_host: bool,
 69    pub committer_name: Option<String>,
 70    pub committer_email: Option<String>,
 71}
 72
 73impl PartialOrd for User {
 74    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 75        Some(self.cmp(other))
 76    }
 77}
 78
 79impl Ord for User {
 80    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 81        self.github_login.cmp(&other.github_login)
 82    }
 83}
 84
 85impl PartialEq for User {
 86    fn eq(&self, other: &Self) -> bool {
 87        self.id == other.id && self.github_login == other.github_login
 88    }
 89}
 90
 91impl Eq for User {}
 92
 93#[derive(Debug, PartialEq)]
 94pub struct Contact {
 95    pub user: Arc<User>,
 96    pub online: bool,
 97    pub busy: bool,
 98}
 99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ContactRequestStatus {
102    None,
103    RequestSent,
104    RequestReceived,
105    RequestAccepted,
106}
107
108pub struct UserStore {
109    users: HashMap<u64, Arc<User>>,
110    by_github_login: HashMap<SharedString, u64>,
111    participant_indices: HashMap<u64, ParticipantIndex>,
112    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
113    current_plan: Option<proto::Plan>,
114    subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
115    trial_started_at: Option<DateTime<Utc>>,
116    is_usage_based_billing_enabled: Option<bool>,
117    account_too_young: Option<bool>,
118    has_overdue_invoices: Option<bool>,
119    current_user: watch::Receiver<Option<Arc<User>>>,
120    accepted_tos_at: Option<Option<DateTime<Utc>>>,
121    contacts: Vec<Arc<Contact>>,
122    incoming_contact_requests: Vec<Arc<User>>,
123    outgoing_contact_requests: Vec<Arc<User>>,
124    pending_contact_requests: HashMap<u64, usize>,
125    invite_info: Option<InviteInfo>,
126    client: Weak<Client>,
127    _maintain_contacts: Task<()>,
128    _maintain_current_user: Task<Result<()>>,
129    weak_self: WeakEntity<Self>,
130}
131
132#[derive(Clone)]
133pub struct InviteInfo {
134    pub count: u32,
135    pub url: Arc<str>,
136}
137
138pub enum Event {
139    Contact {
140        user: Arc<User>,
141        kind: ContactEventKind,
142    },
143    ShowContacts,
144    ParticipantIndicesChanged,
145    PrivateUserInfoUpdated,
146    PlanUpdated,
147}
148
149#[derive(Clone, Copy)]
150pub enum ContactEventKind {
151    Requested,
152    Accepted,
153    Cancelled,
154}
155
156impl EventEmitter<Event> for UserStore {}
157
158enum UpdateContacts {
159    Update(proto::UpdateContacts),
160    Wait(postage::barrier::Sender),
161    Clear(postage::barrier::Sender),
162}
163
164#[derive(Debug, Clone, Copy, Deref)]
165pub struct ModelRequestUsage(pub RequestUsage);
166
167#[derive(Debug, Clone, Copy, Deref)]
168pub struct EditPredictionUsage(pub RequestUsage);
169
170#[derive(Debug, Clone, Copy)]
171pub struct RequestUsage {
172    pub limit: UsageLimit,
173    pub amount: i32,
174}
175
176impl UserStore {
177    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
178        let (mut current_user_tx, current_user_rx) = watch::channel();
179        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
180        let rpc_subscriptions = vec![
181            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
182            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
183            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
184            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
185        ];
186        Self {
187            users: Default::default(),
188            by_github_login: Default::default(),
189            current_user: current_user_rx,
190            current_plan: None,
191            subscription_period: None,
192            trial_started_at: None,
193            is_usage_based_billing_enabled: None,
194            account_too_young: None,
195            has_overdue_invoices: None,
196            accepted_tos_at: None,
197            contacts: Default::default(),
198            incoming_contact_requests: Default::default(),
199            participant_indices: Default::default(),
200            outgoing_contact_requests: Default::default(),
201            invite_info: None,
202            client: Arc::downgrade(&client),
203            update_contacts_tx,
204            _maintain_contacts: cx.spawn(async move |this, cx| {
205                let _subscriptions = rpc_subscriptions;
206                while let Some(message) = update_contacts_rx.next().await {
207                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
208                    {
209                        task.log_err().await;
210                    } else {
211                        break;
212                    }
213                }
214            }),
215            _maintain_current_user: cx.spawn(async move |this, cx| {
216                let mut status = client.status();
217                let weak = Arc::downgrade(&client);
218                drop(client);
219                while let Some(status) = status.next().await {
220                    // if the client is dropped, the app is shutting down.
221                    let Some(client) = weak.upgrade() else {
222                        return Ok(());
223                    };
224                    match status {
225                        Status::Connected { .. } => {
226                            if let Some(user_id) = client.user_id() {
227                                let fetch_user = if let Ok(fetch_user) =
228                                    this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
229                                {
230                                    fetch_user
231                                } else {
232                                    break;
233                                };
234                                let fetch_private_user_info =
235                                    client.request(proto::GetPrivateUserInfo {}).log_err();
236                                let (user, info) =
237                                    futures::join!(fetch_user, fetch_private_user_info);
238
239                                cx.update(|cx| {
240                                    if let Some(info) = info {
241                                        let staff =
242                                            info.staff && !*feature_flags::ZED_DISABLE_STAFF;
243                                        cx.update_flags(staff, info.flags);
244                                        client.telemetry.set_authenticated_user_info(
245                                            Some(info.metrics_id.clone()),
246                                            staff,
247                                        );
248
249                                        this.update(cx, |this, cx| {
250                                            let accepted_tos_at = {
251                                                #[cfg(debug_assertions)]
252                                                if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
253                                                {
254                                                    None
255                                                } else {
256                                                    info.accepted_tos_at
257                                                }
258
259                                                #[cfg(not(debug_assertions))]
260                                                info.accepted_tos_at
261                                            };
262
263                                            this.set_current_user_accepted_tos_at(accepted_tos_at);
264                                            cx.emit(Event::PrivateUserInfoUpdated);
265                                        })
266                                    } else {
267                                        anyhow::Ok(())
268                                    }
269                                })??;
270
271                                current_user_tx.send(user).await.ok();
272
273                                this.update(cx, |_, cx| cx.notify())?;
274                            }
275                        }
276                        Status::SignedOut => {
277                            current_user_tx.send(None).await.ok();
278                            this.update(cx, |this, cx| {
279                                this.accepted_tos_at = None;
280                                cx.emit(Event::PrivateUserInfoUpdated);
281                                cx.notify();
282                                this.clear_contacts()
283                            })?
284                            .await;
285                        }
286                        Status::ConnectionLost => {
287                            this.update(cx, |this, cx| {
288                                cx.notify();
289                                this.clear_contacts()
290                            })?
291                            .await;
292                        }
293                        _ => {}
294                    }
295                }
296                Ok(())
297            }),
298            pending_contact_requests: Default::default(),
299            weak_self: cx.weak_entity(),
300        }
301    }
302
303    #[cfg(feature = "test-support")]
304    pub fn clear_cache(&mut self) {
305        self.users.clear();
306        self.by_github_login.clear();
307    }
308
309    async fn handle_update_invite_info(
310        this: Entity<Self>,
311        message: TypedEnvelope<proto::UpdateInviteInfo>,
312        mut cx: AsyncApp,
313    ) -> Result<()> {
314        this.update(&mut cx, |this, cx| {
315            this.invite_info = Some(InviteInfo {
316                url: Arc::from(message.payload.url),
317                count: message.payload.count,
318            });
319            cx.notify();
320        })?;
321        Ok(())
322    }
323
324    async fn handle_show_contacts(
325        this: Entity<Self>,
326        _: TypedEnvelope<proto::ShowContacts>,
327        mut cx: AsyncApp,
328    ) -> Result<()> {
329        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
330        Ok(())
331    }
332
333    pub fn invite_info(&self) -> Option<&InviteInfo> {
334        self.invite_info.as_ref()
335    }
336
337    async fn handle_update_contacts(
338        this: Entity<Self>,
339        message: TypedEnvelope<proto::UpdateContacts>,
340        mut cx: AsyncApp,
341    ) -> Result<()> {
342        this.read_with(&mut cx, |this, _| {
343            this.update_contacts_tx
344                .unbounded_send(UpdateContacts::Update(message.payload))
345                .unwrap();
346        })?;
347        Ok(())
348    }
349
350    async fn handle_update_plan(
351        this: Entity<Self>,
352        message: TypedEnvelope<proto::UpdateUserPlan>,
353        mut cx: AsyncApp,
354    ) -> Result<()> {
355        this.update(&mut cx, |this, cx| {
356            this.current_plan = Some(message.payload.plan());
357            this.subscription_period = maybe!({
358                let period = message.payload.subscription_period?;
359                let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
360                let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
361
362                Some((started_at, ended_at))
363            });
364            this.trial_started_at = message
365                .payload
366                .trial_started_at
367                .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
368            this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
369            this.account_too_young = message.payload.account_too_young;
370            this.has_overdue_invoices = message.payload.has_overdue_invoices;
371
372            cx.emit(Event::PlanUpdated);
373            cx.notify();
374        })?;
375        Ok(())
376    }
377
378    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
379        match message {
380            UpdateContacts::Wait(barrier) => {
381                drop(barrier);
382                Task::ready(Ok(()))
383            }
384            UpdateContacts::Clear(barrier) => {
385                self.contacts.clear();
386                self.incoming_contact_requests.clear();
387                self.outgoing_contact_requests.clear();
388                drop(barrier);
389                Task::ready(Ok(()))
390            }
391            UpdateContacts::Update(message) => {
392                let mut user_ids = HashSet::default();
393                for contact in &message.contacts {
394                    user_ids.insert(contact.user_id);
395                }
396                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
397                user_ids.extend(message.outgoing_requests.iter());
398
399                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
400                cx.spawn(async move |this, cx| {
401                    load_users.await?;
402
403                    // Users are fetched in parallel above and cached in call to get_users
404                    // No need to parallelize here
405                    let mut updated_contacts = Vec::new();
406                    let this = this.upgrade().context("can't upgrade user store handle")?;
407                    for contact in message.contacts {
408                        updated_contacts
409                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
410                    }
411
412                    let mut incoming_requests = Vec::new();
413                    for request in message.incoming_requests {
414                        incoming_requests.push({
415                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
416                                .await?
417                        });
418                    }
419
420                    let mut outgoing_requests = Vec::new();
421                    for requested_user_id in message.outgoing_requests {
422                        outgoing_requests.push(
423                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
424                                .await?,
425                        );
426                    }
427
428                    let removed_contacts =
429                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
430                    let removed_incoming_requests =
431                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
432                    let removed_outgoing_requests =
433                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
434
435                    this.update(cx, |this, cx| {
436                        // Remove contacts
437                        this.contacts
438                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
439                        // Update existing contacts and insert new ones
440                        for updated_contact in updated_contacts {
441                            match this.contacts.binary_search_by_key(
442                                &&updated_contact.user.github_login,
443                                |contact| &contact.user.github_login,
444                            ) {
445                                Ok(ix) => this.contacts[ix] = updated_contact,
446                                Err(ix) => this.contacts.insert(ix, updated_contact),
447                            }
448                        }
449
450                        // Remove incoming contact requests
451                        this.incoming_contact_requests.retain(|user| {
452                            if removed_incoming_requests.contains(&user.id) {
453                                cx.emit(Event::Contact {
454                                    user: user.clone(),
455                                    kind: ContactEventKind::Cancelled,
456                                });
457                                false
458                            } else {
459                                true
460                            }
461                        });
462                        // Update existing incoming requests and insert new ones
463                        for user in incoming_requests {
464                            match this
465                                .incoming_contact_requests
466                                .binary_search_by_key(&&user.github_login, |contact| {
467                                    &contact.github_login
468                                }) {
469                                Ok(ix) => this.incoming_contact_requests[ix] = user,
470                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
471                            }
472                        }
473
474                        // Remove outgoing contact requests
475                        this.outgoing_contact_requests
476                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
477                        // Update existing incoming requests and insert new ones
478                        for request in outgoing_requests {
479                            match this
480                                .outgoing_contact_requests
481                                .binary_search_by_key(&&request.github_login, |contact| {
482                                    &contact.github_login
483                                }) {
484                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
485                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
486                            }
487                        }
488
489                        cx.notify();
490                    })?;
491
492                    Ok(())
493                })
494            }
495        }
496    }
497
498    pub fn contacts(&self) -> &[Arc<Contact>] {
499        &self.contacts
500    }
501
502    pub fn has_contact(&self, user: &Arc<User>) -> bool {
503        self.contacts
504            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
505            .is_ok()
506    }
507
508    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
509        &self.incoming_contact_requests
510    }
511
512    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
513        &self.outgoing_contact_requests
514    }
515
516    pub fn is_contact_request_pending(&self, user: &User) -> bool {
517        self.pending_contact_requests.contains_key(&user.id)
518    }
519
520    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
521        if self
522            .contacts
523            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
524            .is_ok()
525        {
526            ContactRequestStatus::RequestAccepted
527        } else if self
528            .outgoing_contact_requests
529            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
530            .is_ok()
531        {
532            ContactRequestStatus::RequestSent
533        } else if self
534            .incoming_contact_requests
535            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
536            .is_ok()
537        {
538            ContactRequestStatus::RequestReceived
539        } else {
540            ContactRequestStatus::None
541        }
542    }
543
544    pub fn request_contact(
545        &mut self,
546        responder_id: u64,
547        cx: &mut Context<Self>,
548    ) -> Task<Result<()>> {
549        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
550    }
551
552    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
553        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
554    }
555
556    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
557        self.incoming_contact_requests
558            .iter()
559            .any(|user| user.id == user_id)
560    }
561
562    pub fn respond_to_contact_request(
563        &mut self,
564        requester_id: u64,
565        accept: bool,
566        cx: &mut Context<Self>,
567    ) -> Task<Result<()>> {
568        self.perform_contact_request(
569            requester_id,
570            proto::RespondToContactRequest {
571                requester_id,
572                response: if accept {
573                    proto::ContactRequestResponse::Accept
574                } else {
575                    proto::ContactRequestResponse::Decline
576                } as i32,
577            },
578            cx,
579        )
580    }
581
582    pub fn dismiss_contact_request(
583        &self,
584        requester_id: u64,
585        cx: &Context<Self>,
586    ) -> Task<Result<()>> {
587        let client = self.client.upgrade();
588        cx.spawn(async move |_, _| {
589            client
590                .context("can't upgrade client reference")?
591                .request(proto::RespondToContactRequest {
592                    requester_id,
593                    response: proto::ContactRequestResponse::Dismiss as i32,
594                })
595                .await?;
596            Ok(())
597        })
598    }
599
600    fn perform_contact_request<T: RequestMessage>(
601        &mut self,
602        user_id: u64,
603        request: T,
604        cx: &mut Context<Self>,
605    ) -> Task<Result<()>> {
606        let client = self.client.upgrade();
607        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
608        cx.notify();
609
610        cx.spawn(async move |this, cx| {
611            let response = client
612                .context("can't upgrade client reference")?
613                .request(request)
614                .await;
615            this.update(cx, |this, cx| {
616                if let Entry::Occupied(mut request_count) =
617                    this.pending_contact_requests.entry(user_id)
618                {
619                    *request_count.get_mut() -= 1;
620                    if *request_count.get() == 0 {
621                        request_count.remove();
622                    }
623                }
624                cx.notify();
625            })?;
626            response?;
627            Ok(())
628        })
629    }
630
631    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
632        let (tx, mut rx) = postage::barrier::channel();
633        self.update_contacts_tx
634            .unbounded_send(UpdateContacts::Clear(tx))
635            .unwrap();
636        async move {
637            rx.next().await;
638        }
639    }
640
641    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
642        let (tx, mut rx) = postage::barrier::channel();
643        self.update_contacts_tx
644            .unbounded_send(UpdateContacts::Wait(tx))
645            .unwrap();
646        async move {
647            rx.next().await;
648        }
649    }
650
651    pub fn get_users(
652        &self,
653        user_ids: Vec<u64>,
654        cx: &Context<Self>,
655    ) -> Task<Result<Vec<Arc<User>>>> {
656        let mut user_ids_to_fetch = user_ids.clone();
657        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
658
659        cx.spawn(async move |this, cx| {
660            if !user_ids_to_fetch.is_empty() {
661                this.update(cx, |this, cx| {
662                    this.load_users(
663                        proto::GetUsers {
664                            user_ids: user_ids_to_fetch,
665                        },
666                        cx,
667                    )
668                })?
669                .await?;
670            }
671
672            this.read_with(cx, |this, _| {
673                user_ids
674                    .iter()
675                    .map(|user_id| {
676                        this.users
677                            .get(user_id)
678                            .cloned()
679                            .with_context(|| format!("user {user_id} not found"))
680                    })
681                    .collect()
682            })?
683        })
684    }
685
686    pub fn fuzzy_search_users(
687        &self,
688        query: String,
689        cx: &Context<Self>,
690    ) -> Task<Result<Vec<Arc<User>>>> {
691        self.load_users(proto::FuzzySearchUsers { query }, cx)
692    }
693
694    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
695        self.users.get(&user_id).cloned()
696    }
697
698    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
699        if let Some(user) = self.users.get(&user_id).cloned() {
700            return Some(user);
701        }
702
703        self.get_user(user_id, cx).detach_and_log_err(cx);
704        None
705    }
706
707    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
708        if let Some(user) = self.users.get(&user_id).cloned() {
709            return Task::ready(Ok(user));
710        }
711
712        let load_users = self.get_users(vec![user_id], cx);
713        cx.spawn(async move |this, cx| {
714            load_users.await?;
715            this.read_with(cx, |this, _| {
716                this.users
717                    .get(&user_id)
718                    .cloned()
719                    .context("server responded with no users")
720            })?
721        })
722    }
723
724    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
725        self.by_github_login
726            .get(github_login)
727            .and_then(|id| self.users.get(id).cloned())
728    }
729
730    pub fn current_user(&self) -> Option<Arc<User>> {
731        self.current_user.borrow().clone()
732    }
733
734    pub fn current_plan(&self) -> Option<proto::Plan> {
735        #[cfg(debug_assertions)]
736        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
737            return match plan.as_str() {
738                "free" => Some(proto::Plan::Free),
739                "trial" => Some(proto::Plan::ZedProTrial),
740                "pro" => Some(proto::Plan::ZedPro),
741                _ => {
742                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
743                }
744            };
745        }
746
747        self.current_plan
748    }
749
750    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
751        self.subscription_period
752    }
753
754    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
755        self.trial_started_at
756    }
757
758    pub fn usage_based_billing_enabled(&self) -> Option<bool> {
759        self.is_usage_based_billing_enabled
760    }
761
762    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
763        self.current_user.clone()
764    }
765
766    /// Returns whether the user's account is too new to use the service.
767    pub fn account_too_young(&self) -> bool {
768        self.account_too_young.unwrap_or(false)
769    }
770
771    /// Returns whether the current user has overdue invoices and usage should be blocked.
772    pub fn has_overdue_invoices(&self) -> bool {
773        self.has_overdue_invoices.unwrap_or(false)
774    }
775
776    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
777        self.accepted_tos_at
778            .map(|accepted_tos_at| accepted_tos_at.is_some())
779    }
780
781    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
782        if self.current_user().is_none() {
783            return Task::ready(Err(anyhow!("no current user")));
784        };
785
786        let client = self.client.clone();
787        cx.spawn(async move |this, cx| -> anyhow::Result<()> {
788            let client = client.upgrade().context("client not found")?;
789            let response = client
790                .request(proto::AcceptTermsOfService {})
791                .await
792                .context("error accepting tos")?;
793            this.update(cx, |this, cx| {
794                this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
795                cx.emit(Event::PrivateUserInfoUpdated);
796            })?;
797            Ok(())
798        })
799    }
800
801    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
802        self.accepted_tos_at = Some(
803            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
804        );
805    }
806
807    fn load_users(
808        &self,
809        request: impl RequestMessage<Response = UsersResponse>,
810        cx: &Context<Self>,
811    ) -> Task<Result<Vec<Arc<User>>>> {
812        let client = self.client.clone();
813        cx.spawn(async move |this, cx| {
814            if let Some(rpc) = client.upgrade() {
815                let response = rpc.request(request).await.context("error loading users")?;
816                let users = response.users;
817
818                this.update(cx, |this, _| this.insert(users))
819            } else {
820                Ok(Vec::new())
821            }
822        })
823    }
824
825    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
826        let mut ret = Vec::with_capacity(users.len());
827        for user in users {
828            let user = User::new(user);
829            if let Some(old) = self.users.insert(user.id, user.clone()) {
830                if old.github_login != user.github_login {
831                    self.by_github_login.remove(&old.github_login);
832                }
833            }
834            self.by_github_login
835                .insert(user.github_login.clone(), user.id);
836            ret.push(user)
837        }
838        ret
839    }
840
841    pub fn set_participant_indices(
842        &mut self,
843        participant_indices: HashMap<u64, ParticipantIndex>,
844        cx: &mut Context<Self>,
845    ) {
846        if participant_indices != self.participant_indices {
847            self.participant_indices = participant_indices;
848            cx.emit(Event::ParticipantIndicesChanged);
849        }
850    }
851
852    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
853        &self.participant_indices
854    }
855
856    pub fn participant_names(
857        &self,
858        user_ids: impl Iterator<Item = u64>,
859        cx: &App,
860    ) -> HashMap<u64, SharedString> {
861        let mut ret = HashMap::default();
862        let mut missing_user_ids = Vec::new();
863        for id in user_ids {
864            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
865                ret.insert(id, github_login);
866            } else {
867                missing_user_ids.push(id)
868            }
869        }
870        if !missing_user_ids.is_empty() {
871            let this = self.weak_self.clone();
872            cx.spawn(async move |cx| {
873                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
874                    .await
875            })
876            .detach_and_log_err(cx);
877        }
878        ret
879    }
880}
881
882impl User {
883    fn new(message: proto::User) -> Arc<Self> {
884        Arc::new(User {
885            id: message.id,
886            github_login: message.github_login.into(),
887            avatar_uri: message.avatar_url.into(),
888            name: message.name,
889        })
890    }
891}
892
893impl Contact {
894    async fn from_proto(
895        contact: proto::Contact,
896        user_store: &Entity<UserStore>,
897        cx: &mut AsyncApp,
898    ) -> Result<Self> {
899        let user = user_store
900            .update(cx, |user_store, cx| {
901                user_store.get_user(contact.user_id, cx)
902            })?
903            .await?;
904        Ok(Self {
905            user,
906            online: contact.online,
907            busy: contact.busy,
908        })
909    }
910}
911
912impl Collaborator {
913    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
914        Ok(Self {
915            peer_id: message.peer_id.context("invalid peer id")?,
916            replica_id: message.replica_id as ReplicaId,
917            user_id: message.user_id as UserId,
918            is_host: message.is_host,
919            committer_name: message.committer_name,
920            committer_email: message.committer_email,
921        })
922    }
923}
924
925impl RequestUsage {
926    pub fn over_limit(&self) -> bool {
927        match self.limit {
928            UsageLimit::Limited(limit) => self.amount >= limit,
929            UsageLimit::Unlimited => false,
930        }
931    }
932
933    pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
934        let limit = match limit.variant? {
935            proto::usage_limit::Variant::Limited(limited) => {
936                UsageLimit::Limited(limited.limit as i32)
937            }
938            proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
939        };
940        Some(RequestUsage {
941            limit,
942            amount: amount as i32,
943        })
944    }
945
946    fn from_headers(
947        limit_name: &str,
948        amount_name: &str,
949        headers: &HeaderMap<HeaderValue>,
950    ) -> Result<Self> {
951        let limit = headers
952            .get(limit_name)
953            .with_context(|| format!("missing {limit_name:?} header"))?;
954        let limit = UsageLimit::from_str(limit.to_str()?)?;
955
956        let amount = headers
957            .get(amount_name)
958            .with_context(|| format!("missing {amount_name:?} header"))?;
959        let amount = amount.to_str()?.parse::<i32>()?;
960
961        Ok(Self { limit, amount })
962    }
963}
964
965impl ModelRequestUsage {
966    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
967        Ok(Self(RequestUsage::from_headers(
968            MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
969            MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
970            headers,
971        )?))
972    }
973}
974
975impl EditPredictionUsage {
976    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
977        Ok(Self(RequestUsage::from_headers(
978            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
979            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
980            headers,
981        )?))
982    }
983}