user.rs

  1use super::{Client, Status, TypedEnvelope, proto};
  2use anyhow::{Context as _, Result};
  3use chrono::{DateTime, Utc};
  4use cloud_api_client::websocket_protocol::MessageToClient;
  5use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
  6use cloud_llm_client::{
  7    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
  8    MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
  9};
 10use collections::{HashMap, HashSet, hash_map::Entry};
 11use derive_more::Deref;
 12use feature_flags::FeatureFlagAppExt;
 13use futures::{Future, StreamExt, channel::mpsc};
 14use gpui::{
 15    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
 16};
 17use http_client::http::{HeaderMap, HeaderValue};
 18use postage::{sink::Sink, watch};
 19use rpc::proto::{RequestMessage, UsersResponse};
 20use std::{
 21    str::FromStr as _,
 22    sync::{Arc, Weak},
 23};
 24use text::ReplicaId;
 25use util::{ResultExt, TryFutureExt as _};
 26
 27pub type UserId = u64;
 28
 29#[derive(
 30    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 31)]
 32pub struct ChannelId(pub u64);
 33
 34impl std::fmt::Display for ChannelId {
 35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 36        self.0.fmt(f)
 37    }
 38}
 39
 40#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 41pub struct ProjectId(pub u64);
 42
 43impl ProjectId {
 44    pub fn to_proto(self) -> u64 {
 45        self.0
 46    }
 47}
 48
 49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 50pub struct ParticipantIndex(pub u32);
 51
 52#[derive(Default, Debug)]
 53pub struct User {
 54    pub id: UserId,
 55    pub github_login: SharedString,
 56    pub avatar_uri: SharedUri,
 57    pub name: Option<String>,
 58}
 59
 60#[derive(Clone, Debug, PartialEq, Eq)]
 61pub struct Collaborator {
 62    pub peer_id: proto::PeerId,
 63    pub replica_id: ReplicaId,
 64    pub user_id: UserId,
 65    pub is_host: bool,
 66    pub committer_name: Option<String>,
 67    pub committer_email: Option<String>,
 68}
 69
 70impl PartialOrd for User {
 71    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 72        Some(self.cmp(other))
 73    }
 74}
 75
 76impl Ord for User {
 77    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 78        self.github_login.cmp(&other.github_login)
 79    }
 80}
 81
 82impl PartialEq for User {
 83    fn eq(&self, other: &Self) -> bool {
 84        self.id == other.id && self.github_login == other.github_login
 85    }
 86}
 87
 88impl Eq for User {}
 89
 90#[derive(Debug, PartialEq)]
 91pub struct Contact {
 92    pub user: Arc<User>,
 93    pub online: bool,
 94    pub busy: bool,
 95}
 96
 97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 98pub enum ContactRequestStatus {
 99    None,
100    RequestSent,
101    RequestReceived,
102    RequestAccepted,
103}
104
105pub struct UserStore {
106    users: HashMap<u64, Arc<User>>,
107    by_github_login: HashMap<SharedString, u64>,
108    participant_indices: HashMap<u64, ParticipantIndex>,
109    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
110    model_request_usage: Option<ModelRequestUsage>,
111    edit_prediction_usage: Option<EditPredictionUsage>,
112    plan_info: Option<PlanInfo>,
113    current_user: watch::Receiver<Option<Arc<User>>>,
114    contacts: Vec<Arc<Contact>>,
115    incoming_contact_requests: Vec<Arc<User>>,
116    outgoing_contact_requests: Vec<Arc<User>>,
117    pending_contact_requests: HashMap<u64, usize>,
118    invite_info: Option<InviteInfo>,
119    client: Weak<Client>,
120    _maintain_contacts: Task<()>,
121    _maintain_current_user: Task<Result<()>>,
122    weak_self: WeakEntity<Self>,
123}
124
125#[derive(Clone)]
126pub struct InviteInfo {
127    pub count: u32,
128    pub url: Arc<str>,
129}
130
131pub enum Event {
132    Contact {
133        user: Arc<User>,
134        kind: ContactEventKind,
135    },
136    ShowContacts,
137    ParticipantIndicesChanged,
138    PrivateUserInfoUpdated,
139    PlanUpdated,
140}
141
142#[derive(Clone, Copy)]
143pub enum ContactEventKind {
144    Requested,
145    Accepted,
146    Cancelled,
147}
148
149impl EventEmitter<Event> for UserStore {}
150
151enum UpdateContacts {
152    Update(proto::UpdateContacts),
153    Wait(postage::barrier::Sender),
154    Clear(postage::barrier::Sender),
155}
156
157#[derive(Debug, Clone, Copy, Deref)]
158pub struct ModelRequestUsage(pub RequestUsage);
159
160#[derive(Debug, Clone, Copy, Deref)]
161pub struct EditPredictionUsage(pub RequestUsage);
162
163#[derive(Debug, Clone, Copy)]
164pub struct RequestUsage {
165    pub limit: UsageLimit,
166    pub amount: i32,
167}
168
169impl UserStore {
170    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
171        let (mut current_user_tx, current_user_rx) = watch::channel();
172        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
173        let rpc_subscriptions = vec![
174            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
175            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
176            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
177        ];
178
179        client.add_message_to_client_handler({
180            let this = cx.weak_entity();
181            move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
182        });
183
184        Self {
185            users: Default::default(),
186            by_github_login: Default::default(),
187            current_user: current_user_rx,
188            plan_info: None,
189            model_request_usage: None,
190            edit_prediction_usage: None,
191            contacts: Default::default(),
192            incoming_contact_requests: Default::default(),
193            participant_indices: Default::default(),
194            outgoing_contact_requests: Default::default(),
195            invite_info: None,
196            client: Arc::downgrade(&client),
197            update_contacts_tx,
198            _maintain_contacts: cx.spawn(async move |this, cx| {
199                let _subscriptions = rpc_subscriptions;
200                while let Some(message) = update_contacts_rx.next().await {
201                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
202                    {
203                        task.log_err().await;
204                    } else {
205                        break;
206                    }
207                }
208            }),
209            _maintain_current_user: cx.spawn(async move |this, cx| {
210                let mut status = client.status();
211                let weak = Arc::downgrade(&client);
212                drop(client);
213                while let Some(status) = status.next().await {
214                    // if the client is dropped, the app is shutting down.
215                    let Some(client) = weak.upgrade() else {
216                        return Ok(());
217                    };
218                    match status {
219                        Status::Authenticated
220                        | Status::Reauthenticated
221                        | Status::Connected { .. } => {
222                            if let Some(user_id) = client.user_id() {
223                                let response = client
224                                    .cloud_client()
225                                    .get_authenticated_user()
226                                    .await
227                                    .log_err();
228
229                                let current_user_and_response = if let Some(response) = response {
230                                    let user = Arc::new(User {
231                                        id: user_id,
232                                        github_login: response.user.github_login.clone().into(),
233                                        avatar_uri: response.user.avatar_url.clone().into(),
234                                        name: response.user.name.clone(),
235                                    });
236
237                                    Some((user, response))
238                                } else {
239                                    None
240                                };
241                                current_user_tx
242                                    .send(
243                                        current_user_and_response
244                                            .as_ref()
245                                            .map(|(user, _)| user.clone()),
246                                    )
247                                    .await
248                                    .ok();
249
250                                cx.update(|cx| {
251                                    if let Some((user, response)) = current_user_and_response {
252                                        this.update(cx, |this, cx| {
253                                            this.by_github_login
254                                                .insert(user.github_login.clone(), user_id);
255                                            this.users.insert(user_id, user);
256                                            this.update_authenticated_user(response, cx)
257                                        })
258                                    } else {
259                                        anyhow::Ok(())
260                                    }
261                                })??;
262
263                                this.update(cx, |_, cx| cx.notify())?;
264                            }
265                        }
266                        Status::SignedOut => {
267                            current_user_tx.send(None).await.ok();
268                            this.update(cx, |this, cx| {
269                                cx.emit(Event::PrivateUserInfoUpdated);
270                                cx.notify();
271                                this.clear_contacts()
272                            })?
273                            .await;
274                        }
275                        Status::ConnectionLost => {
276                            this.update(cx, |this, cx| {
277                                cx.notify();
278                                this.clear_contacts()
279                            })?
280                            .await;
281                        }
282                        _ => {}
283                    }
284                }
285                Ok(())
286            }),
287            pending_contact_requests: Default::default(),
288            weak_self: cx.weak_entity(),
289        }
290    }
291
292    #[cfg(feature = "test-support")]
293    pub fn clear_cache(&mut self) {
294        self.users.clear();
295        self.by_github_login.clear();
296    }
297
298    async fn handle_update_invite_info(
299        this: Entity<Self>,
300        message: TypedEnvelope<proto::UpdateInviteInfo>,
301        mut cx: AsyncApp,
302    ) -> Result<()> {
303        this.update(&mut cx, |this, cx| {
304            this.invite_info = Some(InviteInfo {
305                url: Arc::from(message.payload.url),
306                count: message.payload.count,
307            });
308            cx.notify();
309        })?;
310        Ok(())
311    }
312
313    async fn handle_show_contacts(
314        this: Entity<Self>,
315        _: TypedEnvelope<proto::ShowContacts>,
316        mut cx: AsyncApp,
317    ) -> Result<()> {
318        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
319        Ok(())
320    }
321
322    pub fn invite_info(&self) -> Option<&InviteInfo> {
323        self.invite_info.as_ref()
324    }
325
326    async fn handle_update_contacts(
327        this: Entity<Self>,
328        message: TypedEnvelope<proto::UpdateContacts>,
329        cx: AsyncApp,
330    ) -> Result<()> {
331        this.read_with(&cx, |this, _| {
332            this.update_contacts_tx
333                .unbounded_send(UpdateContacts::Update(message.payload))
334                .unwrap();
335        })?;
336        Ok(())
337    }
338
339    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
340        match message {
341            UpdateContacts::Wait(barrier) => {
342                drop(barrier);
343                Task::ready(Ok(()))
344            }
345            UpdateContacts::Clear(barrier) => {
346                self.contacts.clear();
347                self.incoming_contact_requests.clear();
348                self.outgoing_contact_requests.clear();
349                drop(barrier);
350                Task::ready(Ok(()))
351            }
352            UpdateContacts::Update(message) => {
353                let mut user_ids = HashSet::default();
354                for contact in &message.contacts {
355                    user_ids.insert(contact.user_id);
356                }
357                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
358                user_ids.extend(message.outgoing_requests.iter());
359
360                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
361                cx.spawn(async move |this, cx| {
362                    load_users.await?;
363
364                    // Users are fetched in parallel above and cached in call to get_users
365                    // No need to parallelize here
366                    let mut updated_contacts = Vec::new();
367                    let this = this.upgrade().context("can't upgrade user store handle")?;
368                    for contact in message.contacts {
369                        updated_contacts
370                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
371                    }
372
373                    let mut incoming_requests = Vec::new();
374                    for request in message.incoming_requests {
375                        incoming_requests.push({
376                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
377                                .await?
378                        });
379                    }
380
381                    let mut outgoing_requests = Vec::new();
382                    for requested_user_id in message.outgoing_requests {
383                        outgoing_requests.push(
384                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
385                                .await?,
386                        );
387                    }
388
389                    let removed_contacts =
390                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
391                    let removed_incoming_requests =
392                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
393                    let removed_outgoing_requests =
394                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
395
396                    this.update(cx, |this, cx| {
397                        // Remove contacts
398                        this.contacts
399                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
400                        // Update existing contacts and insert new ones
401                        for updated_contact in updated_contacts {
402                            match this.contacts.binary_search_by_key(
403                                &&updated_contact.user.github_login,
404                                |contact| &contact.user.github_login,
405                            ) {
406                                Ok(ix) => this.contacts[ix] = updated_contact,
407                                Err(ix) => this.contacts.insert(ix, updated_contact),
408                            }
409                        }
410
411                        // Remove incoming contact requests
412                        this.incoming_contact_requests.retain(|user| {
413                            if removed_incoming_requests.contains(&user.id) {
414                                cx.emit(Event::Contact {
415                                    user: user.clone(),
416                                    kind: ContactEventKind::Cancelled,
417                                });
418                                false
419                            } else {
420                                true
421                            }
422                        });
423                        // Update existing incoming requests and insert new ones
424                        for user in incoming_requests {
425                            match this
426                                .incoming_contact_requests
427                                .binary_search_by_key(&&user.github_login, |contact| {
428                                    &contact.github_login
429                                }) {
430                                Ok(ix) => this.incoming_contact_requests[ix] = user,
431                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
432                            }
433                        }
434
435                        // Remove outgoing contact requests
436                        this.outgoing_contact_requests
437                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
438                        // Update existing incoming requests and insert new ones
439                        for request in outgoing_requests {
440                            match this
441                                .outgoing_contact_requests
442                                .binary_search_by_key(&&request.github_login, |contact| {
443                                    &contact.github_login
444                                }) {
445                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
446                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
447                            }
448                        }
449
450                        cx.notify();
451                    })?;
452
453                    Ok(())
454                })
455            }
456        }
457    }
458
459    pub fn contacts(&self) -> &[Arc<Contact>] {
460        &self.contacts
461    }
462
463    pub fn has_contact(&self, user: &Arc<User>) -> bool {
464        self.contacts
465            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
466            .is_ok()
467    }
468
469    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
470        &self.incoming_contact_requests
471    }
472
473    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
474        &self.outgoing_contact_requests
475    }
476
477    pub fn is_contact_request_pending(&self, user: &User) -> bool {
478        self.pending_contact_requests.contains_key(&user.id)
479    }
480
481    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
482        if self
483            .contacts
484            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
485            .is_ok()
486        {
487            ContactRequestStatus::RequestAccepted
488        } else if self
489            .outgoing_contact_requests
490            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
491            .is_ok()
492        {
493            ContactRequestStatus::RequestSent
494        } else if self
495            .incoming_contact_requests
496            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
497            .is_ok()
498        {
499            ContactRequestStatus::RequestReceived
500        } else {
501            ContactRequestStatus::None
502        }
503    }
504
505    pub fn request_contact(
506        &mut self,
507        responder_id: u64,
508        cx: &mut Context<Self>,
509    ) -> Task<Result<()>> {
510        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
511    }
512
513    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
514        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
515    }
516
517    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
518        self.incoming_contact_requests
519            .iter()
520            .any(|user| user.id == user_id)
521    }
522
523    pub fn respond_to_contact_request(
524        &mut self,
525        requester_id: u64,
526        accept: bool,
527        cx: &mut Context<Self>,
528    ) -> Task<Result<()>> {
529        self.perform_contact_request(
530            requester_id,
531            proto::RespondToContactRequest {
532                requester_id,
533                response: if accept {
534                    proto::ContactRequestResponse::Accept
535                } else {
536                    proto::ContactRequestResponse::Decline
537                } as i32,
538            },
539            cx,
540        )
541    }
542
543    pub fn dismiss_contact_request(
544        &self,
545        requester_id: u64,
546        cx: &Context<Self>,
547    ) -> Task<Result<()>> {
548        let client = self.client.upgrade();
549        cx.spawn(async move |_, _| {
550            client
551                .context("can't upgrade client reference")?
552                .request(proto::RespondToContactRequest {
553                    requester_id,
554                    response: proto::ContactRequestResponse::Dismiss as i32,
555                })
556                .await?;
557            Ok(())
558        })
559    }
560
561    fn perform_contact_request<T: RequestMessage>(
562        &mut self,
563        user_id: u64,
564        request: T,
565        cx: &mut Context<Self>,
566    ) -> Task<Result<()>> {
567        let client = self.client.upgrade();
568        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
569        cx.notify();
570
571        cx.spawn(async move |this, cx| {
572            let response = client
573                .context("can't upgrade client reference")?
574                .request(request)
575                .await;
576            this.update(cx, |this, cx| {
577                if let Entry::Occupied(mut request_count) =
578                    this.pending_contact_requests.entry(user_id)
579                {
580                    *request_count.get_mut() -= 1;
581                    if *request_count.get() == 0 {
582                        request_count.remove();
583                    }
584                }
585                cx.notify();
586            })?;
587            response?;
588            Ok(())
589        })
590    }
591
592    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
593        let (tx, mut rx) = postage::barrier::channel();
594        self.update_contacts_tx
595            .unbounded_send(UpdateContacts::Clear(tx))
596            .unwrap();
597        async move {
598            rx.next().await;
599        }
600    }
601
602    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
603        let (tx, mut rx) = postage::barrier::channel();
604        self.update_contacts_tx
605            .unbounded_send(UpdateContacts::Wait(tx))
606            .unwrap();
607        async move {
608            rx.next().await;
609        }
610    }
611
612    pub fn get_users(
613        &self,
614        user_ids: Vec<u64>,
615        cx: &Context<Self>,
616    ) -> Task<Result<Vec<Arc<User>>>> {
617        let mut user_ids_to_fetch = user_ids.clone();
618        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
619
620        cx.spawn(async move |this, cx| {
621            if !user_ids_to_fetch.is_empty() {
622                this.update(cx, |this, cx| {
623                    this.load_users(
624                        proto::GetUsers {
625                            user_ids: user_ids_to_fetch,
626                        },
627                        cx,
628                    )
629                })?
630                .await?;
631            }
632
633            this.read_with(cx, |this, _| {
634                user_ids
635                    .iter()
636                    .map(|user_id| {
637                        this.users
638                            .get(user_id)
639                            .cloned()
640                            .with_context(|| format!("user {user_id} not found"))
641                    })
642                    .collect()
643            })?
644        })
645    }
646
647    pub fn fuzzy_search_users(
648        &self,
649        query: String,
650        cx: &Context<Self>,
651    ) -> Task<Result<Vec<Arc<User>>>> {
652        self.load_users(proto::FuzzySearchUsers { query }, cx)
653    }
654
655    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
656        self.users.get(&user_id).cloned()
657    }
658
659    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
660        if let Some(user) = self.users.get(&user_id).cloned() {
661            return Some(user);
662        }
663
664        self.get_user(user_id, cx).detach_and_log_err(cx);
665        None
666    }
667
668    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
669        if let Some(user) = self.users.get(&user_id).cloned() {
670            return Task::ready(Ok(user));
671        }
672
673        let load_users = self.get_users(vec![user_id], cx);
674        cx.spawn(async move |this, cx| {
675            load_users.await?;
676            this.read_with(cx, |this, _| {
677                this.users
678                    .get(&user_id)
679                    .cloned()
680                    .context("server responded with no users")
681            })?
682        })
683    }
684
685    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
686        self.by_github_login
687            .get(github_login)
688            .and_then(|id| self.users.get(id).cloned())
689    }
690
691    pub fn current_user(&self) -> Option<Arc<User>> {
692        self.current_user.borrow().clone()
693    }
694
695    pub fn plan(&self) -> Option<cloud_llm_client::Plan> {
696        #[cfg(debug_assertions)]
697        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
698            return match plan.as_str() {
699                "free" => Some(cloud_llm_client::Plan::ZedFree),
700                "trial" => Some(cloud_llm_client::Plan::ZedProTrial),
701                "pro" => Some(cloud_llm_client::Plan::ZedPro),
702                _ => {
703                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
704                }
705            };
706        }
707
708        self.plan_info.as_ref().map(|info| info.plan)
709    }
710
711    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
712        self.plan_info
713            .as_ref()
714            .and_then(|plan| plan.subscription_period)
715            .map(|subscription_period| {
716                (
717                    subscription_period.started_at.0,
718                    subscription_period.ended_at.0,
719                )
720            })
721    }
722
723    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
724        self.plan_info
725            .as_ref()
726            .and_then(|plan| plan.trial_started_at)
727            .map(|trial_started_at| trial_started_at.0)
728    }
729
730    /// Returns whether the user's account is too new to use the service.
731    pub fn account_too_young(&self) -> bool {
732        self.plan_info
733            .as_ref()
734            .map(|plan| plan.is_account_too_young)
735            .unwrap_or_default()
736    }
737
738    /// Returns whether the current user has overdue invoices and usage should be blocked.
739    pub fn has_overdue_invoices(&self) -> bool {
740        self.plan_info
741            .as_ref()
742            .map(|plan| plan.has_overdue_invoices)
743            .unwrap_or_default()
744    }
745
746    pub fn is_usage_based_billing_enabled(&self) -> bool {
747        self.plan_info
748            .as_ref()
749            .map(|plan| plan.is_usage_based_billing_enabled)
750            .unwrap_or_default()
751    }
752
753    pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
754        self.model_request_usage
755    }
756
757    pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
758        self.model_request_usage = Some(usage);
759        cx.notify();
760    }
761
762    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
763        self.edit_prediction_usage
764    }
765
766    pub fn update_edit_prediction_usage(
767        &mut self,
768        usage: EditPredictionUsage,
769        cx: &mut Context<Self>,
770    ) {
771        self.edit_prediction_usage = Some(usage);
772        cx.notify();
773    }
774
775    fn update_authenticated_user(
776        &mut self,
777        response: GetAuthenticatedUserResponse,
778        cx: &mut Context<Self>,
779    ) {
780        let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
781        cx.update_flags(staff, response.feature_flags);
782        if let Some(client) = self.client.upgrade() {
783            client
784                .telemetry
785                .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
786        }
787
788        self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
789            limit: response.plan.usage.model_requests.limit,
790            amount: response.plan.usage.model_requests.used as i32,
791        }));
792        self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
793            limit: response.plan.usage.edit_predictions.limit,
794            amount: response.plan.usage.edit_predictions.used as i32,
795        }));
796        self.plan_info = Some(response.plan);
797        cx.emit(Event::PrivateUserInfoUpdated);
798    }
799
800    fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
801        cx.spawn(async move |cx| {
802            match message {
803                MessageToClient::UserUpdated => {
804                    let cloud_client = cx
805                        .update(|cx| {
806                            this.read_with(cx, |this, _cx| {
807                                this.client.upgrade().map(|client| client.cloud_client())
808                            })
809                        })??
810                        .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
811
812                    let response = cloud_client.get_authenticated_user().await?;
813                    cx.update(|cx| {
814                        this.update(cx, |this, cx| {
815                            this.update_authenticated_user(response, cx);
816                        })
817                    })??;
818                }
819            }
820
821            anyhow::Ok(())
822        })
823        .detach_and_log_err(cx);
824    }
825
826    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
827        self.current_user.clone()
828    }
829
830    fn load_users(
831        &self,
832        request: impl RequestMessage<Response = UsersResponse>,
833        cx: &Context<Self>,
834    ) -> Task<Result<Vec<Arc<User>>>> {
835        let client = self.client.clone();
836        cx.spawn(async move |this, cx| {
837            if let Some(rpc) = client.upgrade() {
838                let response = rpc.request(request).await.context("error loading users")?;
839                let users = response.users;
840
841                this.update(cx, |this, _| this.insert(users))
842            } else {
843                Ok(Vec::new())
844            }
845        })
846    }
847
848    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
849        let mut ret = Vec::with_capacity(users.len());
850        for user in users {
851            let user = User::new(user);
852            if let Some(old) = self.users.insert(user.id, user.clone())
853                && old.github_login != user.github_login
854            {
855                self.by_github_login.remove(&old.github_login);
856            }
857            self.by_github_login
858                .insert(user.github_login.clone(), user.id);
859            ret.push(user)
860        }
861        ret
862    }
863
864    pub fn set_participant_indices(
865        &mut self,
866        participant_indices: HashMap<u64, ParticipantIndex>,
867        cx: &mut Context<Self>,
868    ) {
869        if participant_indices != self.participant_indices {
870            self.participant_indices = participant_indices;
871            cx.emit(Event::ParticipantIndicesChanged);
872        }
873    }
874
875    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
876        &self.participant_indices
877    }
878
879    pub fn participant_names(
880        &self,
881        user_ids: impl Iterator<Item = u64>,
882        cx: &App,
883    ) -> HashMap<u64, SharedString> {
884        let mut ret = HashMap::default();
885        let mut missing_user_ids = Vec::new();
886        for id in user_ids {
887            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
888                ret.insert(id, github_login);
889            } else {
890                missing_user_ids.push(id)
891            }
892        }
893        if !missing_user_ids.is_empty() {
894            let this = self.weak_self.clone();
895            cx.spawn(async move |cx| {
896                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
897                    .await
898            })
899            .detach_and_log_err(cx);
900        }
901        ret
902    }
903}
904
905impl User {
906    fn new(message: proto::User) -> Arc<Self> {
907        Arc::new(User {
908            id: message.id,
909            github_login: message.github_login.into(),
910            avatar_uri: message.avatar_url.into(),
911            name: message.name,
912        })
913    }
914}
915
916impl Contact {
917    async fn from_proto(
918        contact: proto::Contact,
919        user_store: &Entity<UserStore>,
920        cx: &mut AsyncApp,
921    ) -> Result<Self> {
922        let user = user_store
923            .update(cx, |user_store, cx| {
924                user_store.get_user(contact.user_id, cx)
925            })?
926            .await?;
927        Ok(Self {
928            user,
929            online: contact.online,
930            busy: contact.busy,
931        })
932    }
933}
934
935impl Collaborator {
936    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
937        Ok(Self {
938            peer_id: message.peer_id.context("invalid peer id")?,
939            replica_id: message.replica_id as ReplicaId,
940            user_id: message.user_id as UserId,
941            is_host: message.is_host,
942            committer_name: message.committer_name,
943            committer_email: message.committer_email,
944        })
945    }
946}
947
948impl RequestUsage {
949    pub fn over_limit(&self) -> bool {
950        match self.limit {
951            UsageLimit::Limited(limit) => self.amount >= limit,
952            UsageLimit::Unlimited => false,
953        }
954    }
955
956    fn from_headers(
957        limit_name: &str,
958        amount_name: &str,
959        headers: &HeaderMap<HeaderValue>,
960    ) -> Result<Self> {
961        let limit = headers
962            .get(limit_name)
963            .with_context(|| format!("missing {limit_name:?} header"))?;
964        let limit = UsageLimit::from_str(limit.to_str()?)?;
965
966        let amount = headers
967            .get(amount_name)
968            .with_context(|| format!("missing {amount_name:?} header"))?;
969        let amount = amount.to_str()?.parse::<i32>()?;
970
971        Ok(Self { limit, amount })
972    }
973}
974
975impl ModelRequestUsage {
976    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
977        Ok(Self(RequestUsage::from_headers(
978            MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
979            MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
980            headers,
981        )?))
982    }
983}
984
985impl EditPredictionUsage {
986    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
987        Ok(Self(RequestUsage::from_headers(
988            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
989            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
990            headers,
991        )?))
992    }
993}