user.rs

  1use super::{Client, Status, TypedEnvelope, proto};
  2use anyhow::{Context as _, Result};
  3use chrono::{DateTime, Utc};
  4use cloud_api_client::websocket_protocol::MessageToClient;
  5use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
  6use cloud_llm_client::{
  7    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
  8    MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, Plan,
  9    UsageLimit,
 10};
 11use collections::{HashMap, HashSet, hash_map::Entry};
 12use derive_more::Deref;
 13use feature_flags::FeatureFlagAppExt;
 14use futures::{Future, StreamExt, channel::mpsc};
 15use gpui::{
 16    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
 17};
 18use http_client::http::{HeaderMap, HeaderValue};
 19use postage::{sink::Sink, watch};
 20use rpc::proto::{RequestMessage, UsersResponse};
 21use std::{
 22    str::FromStr as _,
 23    sync::{Arc, Weak},
 24};
 25use text::ReplicaId;
 26use util::{ResultExt, TryFutureExt as _};
 27
 28pub type UserId = u64;
 29
 30#[derive(
 31    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 32)]
 33pub struct ChannelId(pub u64);
 34
 35impl std::fmt::Display for ChannelId {
 36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 37        self.0.fmt(f)
 38    }
 39}
 40
 41#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 42pub struct ProjectId(pub u64);
 43
 44impl ProjectId {
 45    pub fn to_proto(self) -> u64 {
 46        self.0
 47    }
 48}
 49
 50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 51pub struct ParticipantIndex(pub u32);
 52
 53#[derive(Default, Debug)]
 54pub struct User {
 55    pub id: UserId,
 56    pub github_login: SharedString,
 57    pub avatar_uri: SharedUri,
 58    pub name: Option<String>,
 59}
 60
 61#[derive(Clone, Debug, PartialEq, Eq)]
 62pub struct Collaborator {
 63    pub peer_id: proto::PeerId,
 64    pub replica_id: ReplicaId,
 65    pub user_id: UserId,
 66    pub is_host: bool,
 67    pub committer_name: Option<String>,
 68    pub committer_email: Option<String>,
 69}
 70
 71impl PartialOrd for User {
 72    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 73        Some(self.cmp(other))
 74    }
 75}
 76
 77impl Ord for User {
 78    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 79        self.github_login.cmp(&other.github_login)
 80    }
 81}
 82
 83impl PartialEq for User {
 84    fn eq(&self, other: &Self) -> bool {
 85        self.id == other.id && self.github_login == other.github_login
 86    }
 87}
 88
 89impl Eq for User {}
 90
 91#[derive(Debug, PartialEq)]
 92pub struct Contact {
 93    pub user: Arc<User>,
 94    pub online: bool,
 95    pub busy: bool,
 96}
 97
 98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 99pub enum ContactRequestStatus {
100    None,
101    RequestSent,
102    RequestReceived,
103    RequestAccepted,
104}
105
106pub struct UserStore {
107    users: HashMap<u64, Arc<User>>,
108    by_github_login: HashMap<SharedString, u64>,
109    participant_indices: HashMap<u64, ParticipantIndex>,
110    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
111    model_request_usage: Option<ModelRequestUsage>,
112    edit_prediction_usage: Option<EditPredictionUsage>,
113    plan_info: Option<PlanInfo>,
114    current_user: watch::Receiver<Option<Arc<User>>>,
115    contacts: Vec<Arc<Contact>>,
116    incoming_contact_requests: Vec<Arc<User>>,
117    outgoing_contact_requests: Vec<Arc<User>>,
118    pending_contact_requests: HashMap<u64, usize>,
119    invite_info: Option<InviteInfo>,
120    client: Weak<Client>,
121    _maintain_contacts: Task<()>,
122    _maintain_current_user: Task<Result<()>>,
123    weak_self: WeakEntity<Self>,
124}
125
126#[derive(Clone)]
127pub struct InviteInfo {
128    pub count: u32,
129    pub url: Arc<str>,
130}
131
132pub enum Event {
133    Contact {
134        user: Arc<User>,
135        kind: ContactEventKind,
136    },
137    ShowContacts,
138    ParticipantIndicesChanged,
139    PrivateUserInfoUpdated,
140    PlanUpdated,
141}
142
143#[derive(Clone, Copy)]
144pub enum ContactEventKind {
145    Requested,
146    Accepted,
147    Cancelled,
148}
149
150impl EventEmitter<Event> for UserStore {}
151
152enum UpdateContacts {
153    Update(proto::UpdateContacts),
154    Wait(postage::barrier::Sender),
155    Clear(postage::barrier::Sender),
156}
157
158#[derive(Debug, Clone, Copy, Deref)]
159pub struct ModelRequestUsage(pub RequestUsage);
160
161#[derive(Debug, Clone, Copy, Deref)]
162pub struct EditPredictionUsage(pub RequestUsage);
163
164#[derive(Debug, Clone, Copy)]
165pub struct RequestUsage {
166    pub limit: UsageLimit,
167    pub amount: i32,
168}
169
170impl UserStore {
171    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
172        let (mut current_user_tx, current_user_rx) = watch::channel();
173        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
174        let rpc_subscriptions = vec![
175            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
176            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
177            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
178        ];
179
180        client.add_message_to_client_handler({
181            let this = cx.weak_entity();
182            move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
183        });
184
185        Self {
186            users: Default::default(),
187            by_github_login: Default::default(),
188            current_user: current_user_rx,
189            plan_info: None,
190            model_request_usage: None,
191            edit_prediction_usage: None,
192            contacts: Default::default(),
193            incoming_contact_requests: Default::default(),
194            participant_indices: Default::default(),
195            outgoing_contact_requests: Default::default(),
196            invite_info: None,
197            client: Arc::downgrade(&client),
198            update_contacts_tx,
199            _maintain_contacts: cx.spawn(async move |this, cx| {
200                let _subscriptions = rpc_subscriptions;
201                while let Some(message) = update_contacts_rx.next().await {
202                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
203                    {
204                        task.log_err().await;
205                    } else {
206                        break;
207                    }
208                }
209            }),
210            _maintain_current_user: cx.spawn(async move |this, cx| {
211                let mut status = client.status();
212                let weak = Arc::downgrade(&client);
213                drop(client);
214                while let Some(status) = status.next().await {
215                    // if the client is dropped, the app is shutting down.
216                    let Some(client) = weak.upgrade() else {
217                        return Ok(());
218                    };
219                    match status {
220                        Status::Authenticated
221                        | Status::Reauthenticated
222                        | Status::Connected { .. } => {
223                            if let Some(user_id) = client.user_id() {
224                                let response = client
225                                    .cloud_client()
226                                    .get_authenticated_user()
227                                    .await
228                                    .log_err();
229
230                                let current_user_and_response = if let Some(response) = response {
231                                    let user = Arc::new(User {
232                                        id: user_id,
233                                        github_login: response.user.github_login.clone().into(),
234                                        avatar_uri: response.user.avatar_url.clone().into(),
235                                        name: response.user.name.clone(),
236                                    });
237
238                                    Some((user, response))
239                                } else {
240                                    None
241                                };
242                                current_user_tx
243                                    .send(
244                                        current_user_and_response
245                                            .as_ref()
246                                            .map(|(user, _)| user.clone()),
247                                    )
248                                    .await
249                                    .ok();
250
251                                cx.update(|cx| {
252                                    if let Some((user, response)) = current_user_and_response {
253                                        this.update(cx, |this, cx| {
254                                            this.by_github_login
255                                                .insert(user.github_login.clone(), user_id);
256                                            this.users.insert(user_id, user);
257                                            this.update_authenticated_user(response, cx)
258                                        })
259                                    } else {
260                                        anyhow::Ok(())
261                                    }
262                                })??;
263
264                                this.update(cx, |_, cx| cx.notify())?;
265                            }
266                        }
267                        Status::SignedOut => {
268                            current_user_tx.send(None).await.ok();
269                            this.update(cx, |this, cx| {
270                                cx.emit(Event::PrivateUserInfoUpdated);
271                                cx.notify();
272                                this.clear_contacts()
273                            })?
274                            .await;
275                        }
276                        Status::ConnectionLost => {
277                            this.update(cx, |this, cx| {
278                                cx.notify();
279                                this.clear_contacts()
280                            })?
281                            .await;
282                        }
283                        _ => {}
284                    }
285                }
286                Ok(())
287            }),
288            pending_contact_requests: Default::default(),
289            weak_self: cx.weak_entity(),
290        }
291    }
292
293    #[cfg(feature = "test-support")]
294    pub fn clear_cache(&mut self) {
295        self.users.clear();
296        self.by_github_login.clear();
297    }
298
299    async fn handle_update_invite_info(
300        this: Entity<Self>,
301        message: TypedEnvelope<proto::UpdateInviteInfo>,
302        mut cx: AsyncApp,
303    ) -> Result<()> {
304        this.update(&mut cx, |this, cx| {
305            this.invite_info = Some(InviteInfo {
306                url: Arc::from(message.payload.url),
307                count: message.payload.count,
308            });
309            cx.notify();
310        })?;
311        Ok(())
312    }
313
314    async fn handle_show_contacts(
315        this: Entity<Self>,
316        _: TypedEnvelope<proto::ShowContacts>,
317        mut cx: AsyncApp,
318    ) -> Result<()> {
319        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
320        Ok(())
321    }
322
323    pub fn invite_info(&self) -> Option<&InviteInfo> {
324        self.invite_info.as_ref()
325    }
326
327    async fn handle_update_contacts(
328        this: Entity<Self>,
329        message: TypedEnvelope<proto::UpdateContacts>,
330        cx: AsyncApp,
331    ) -> Result<()> {
332        this.read_with(&cx, |this, _| {
333            this.update_contacts_tx
334                .unbounded_send(UpdateContacts::Update(message.payload))
335                .unwrap();
336        })?;
337        Ok(())
338    }
339
340    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
341        match message {
342            UpdateContacts::Wait(barrier) => {
343                drop(barrier);
344                Task::ready(Ok(()))
345            }
346            UpdateContacts::Clear(barrier) => {
347                self.contacts.clear();
348                self.incoming_contact_requests.clear();
349                self.outgoing_contact_requests.clear();
350                drop(barrier);
351                Task::ready(Ok(()))
352            }
353            UpdateContacts::Update(message) => {
354                let mut user_ids = HashSet::default();
355                for contact in &message.contacts {
356                    user_ids.insert(contact.user_id);
357                }
358                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
359                user_ids.extend(message.outgoing_requests.iter());
360
361                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
362                cx.spawn(async move |this, cx| {
363                    load_users.await?;
364
365                    // Users are fetched in parallel above and cached in call to get_users
366                    // No need to parallelize here
367                    let mut updated_contacts = Vec::new();
368                    let this = this.upgrade().context("can't upgrade user store handle")?;
369                    for contact in message.contacts {
370                        updated_contacts
371                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
372                    }
373
374                    let mut incoming_requests = Vec::new();
375                    for request in message.incoming_requests {
376                        incoming_requests.push({
377                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
378                                .await?
379                        });
380                    }
381
382                    let mut outgoing_requests = Vec::new();
383                    for requested_user_id in message.outgoing_requests {
384                        outgoing_requests.push(
385                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
386                                .await?,
387                        );
388                    }
389
390                    let removed_contacts =
391                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
392                    let removed_incoming_requests =
393                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
394                    let removed_outgoing_requests =
395                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
396
397                    this.update(cx, |this, cx| {
398                        // Remove contacts
399                        this.contacts
400                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
401                        // Update existing contacts and insert new ones
402                        for updated_contact in updated_contacts {
403                            match this.contacts.binary_search_by_key(
404                                &&updated_contact.user.github_login,
405                                |contact| &contact.user.github_login,
406                            ) {
407                                Ok(ix) => this.contacts[ix] = updated_contact,
408                                Err(ix) => this.contacts.insert(ix, updated_contact),
409                            }
410                        }
411
412                        // Remove incoming contact requests
413                        this.incoming_contact_requests.retain(|user| {
414                            if removed_incoming_requests.contains(&user.id) {
415                                cx.emit(Event::Contact {
416                                    user: user.clone(),
417                                    kind: ContactEventKind::Cancelled,
418                                });
419                                false
420                            } else {
421                                true
422                            }
423                        });
424                        // Update existing incoming requests and insert new ones
425                        for user in incoming_requests {
426                            match this
427                                .incoming_contact_requests
428                                .binary_search_by_key(&&user.github_login, |contact| {
429                                    &contact.github_login
430                                }) {
431                                Ok(ix) => this.incoming_contact_requests[ix] = user,
432                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
433                            }
434                        }
435
436                        // Remove outgoing contact requests
437                        this.outgoing_contact_requests
438                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
439                        // Update existing incoming requests and insert new ones
440                        for request in outgoing_requests {
441                            match this
442                                .outgoing_contact_requests
443                                .binary_search_by_key(&&request.github_login, |contact| {
444                                    &contact.github_login
445                                }) {
446                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
447                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
448                            }
449                        }
450
451                        cx.notify();
452                    })?;
453
454                    Ok(())
455                })
456            }
457        }
458    }
459
460    pub fn contacts(&self) -> &[Arc<Contact>] {
461        &self.contacts
462    }
463
464    pub fn has_contact(&self, user: &Arc<User>) -> bool {
465        self.contacts
466            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
467            .is_ok()
468    }
469
470    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
471        &self.incoming_contact_requests
472    }
473
474    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
475        &self.outgoing_contact_requests
476    }
477
478    pub fn is_contact_request_pending(&self, user: &User) -> bool {
479        self.pending_contact_requests.contains_key(&user.id)
480    }
481
482    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
483        if self
484            .contacts
485            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
486            .is_ok()
487        {
488            ContactRequestStatus::RequestAccepted
489        } else if self
490            .outgoing_contact_requests
491            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
492            .is_ok()
493        {
494            ContactRequestStatus::RequestSent
495        } else if self
496            .incoming_contact_requests
497            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
498            .is_ok()
499        {
500            ContactRequestStatus::RequestReceived
501        } else {
502            ContactRequestStatus::None
503        }
504    }
505
506    pub fn request_contact(
507        &mut self,
508        responder_id: u64,
509        cx: &mut Context<Self>,
510    ) -> Task<Result<()>> {
511        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
512    }
513
514    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
515        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
516    }
517
518    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
519        self.incoming_contact_requests
520            .iter()
521            .any(|user| user.id == user_id)
522    }
523
524    pub fn respond_to_contact_request(
525        &mut self,
526        requester_id: u64,
527        accept: bool,
528        cx: &mut Context<Self>,
529    ) -> Task<Result<()>> {
530        self.perform_contact_request(
531            requester_id,
532            proto::RespondToContactRequest {
533                requester_id,
534                response: if accept {
535                    proto::ContactRequestResponse::Accept
536                } else {
537                    proto::ContactRequestResponse::Decline
538                } as i32,
539            },
540            cx,
541        )
542    }
543
544    pub fn dismiss_contact_request(
545        &self,
546        requester_id: u64,
547        cx: &Context<Self>,
548    ) -> Task<Result<()>> {
549        let client = self.client.upgrade();
550        cx.spawn(async move |_, _| {
551            client
552                .context("can't upgrade client reference")?
553                .request(proto::RespondToContactRequest {
554                    requester_id,
555                    response: proto::ContactRequestResponse::Dismiss as i32,
556                })
557                .await?;
558            Ok(())
559        })
560    }
561
562    fn perform_contact_request<T: RequestMessage>(
563        &mut self,
564        user_id: u64,
565        request: T,
566        cx: &mut Context<Self>,
567    ) -> Task<Result<()>> {
568        let client = self.client.upgrade();
569        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
570        cx.notify();
571
572        cx.spawn(async move |this, cx| {
573            let response = client
574                .context("can't upgrade client reference")?
575                .request(request)
576                .await;
577            this.update(cx, |this, cx| {
578                if let Entry::Occupied(mut request_count) =
579                    this.pending_contact_requests.entry(user_id)
580                {
581                    *request_count.get_mut() -= 1;
582                    if *request_count.get() == 0 {
583                        request_count.remove();
584                    }
585                }
586                cx.notify();
587            })?;
588            response?;
589            Ok(())
590        })
591    }
592
593    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
594        let (tx, mut rx) = postage::barrier::channel();
595        self.update_contacts_tx
596            .unbounded_send(UpdateContacts::Clear(tx))
597            .unwrap();
598        async move {
599            rx.next().await;
600        }
601    }
602
603    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
604        let (tx, mut rx) = postage::barrier::channel();
605        self.update_contacts_tx
606            .unbounded_send(UpdateContacts::Wait(tx))
607            .unwrap();
608        async move {
609            rx.next().await;
610        }
611    }
612
613    pub fn get_users(
614        &self,
615        user_ids: Vec<u64>,
616        cx: &Context<Self>,
617    ) -> Task<Result<Vec<Arc<User>>>> {
618        let mut user_ids_to_fetch = user_ids.clone();
619        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
620
621        cx.spawn(async move |this, cx| {
622            if !user_ids_to_fetch.is_empty() {
623                this.update(cx, |this, cx| {
624                    this.load_users(
625                        proto::GetUsers {
626                            user_ids: user_ids_to_fetch,
627                        },
628                        cx,
629                    )
630                })?
631                .await?;
632            }
633
634            this.read_with(cx, |this, _| {
635                user_ids
636                    .iter()
637                    .map(|user_id| {
638                        this.users
639                            .get(user_id)
640                            .cloned()
641                            .with_context(|| format!("user {user_id} not found"))
642                    })
643                    .collect()
644            })?
645        })
646    }
647
648    pub fn fuzzy_search_users(
649        &self,
650        query: String,
651        cx: &Context<Self>,
652    ) -> Task<Result<Vec<Arc<User>>>> {
653        self.load_users(proto::FuzzySearchUsers { query }, cx)
654    }
655
656    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
657        self.users.get(&user_id).cloned()
658    }
659
660    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
661        if let Some(user) = self.users.get(&user_id).cloned() {
662            return Some(user);
663        }
664
665        self.get_user(user_id, cx).detach_and_log_err(cx);
666        None
667    }
668
669    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
670        if let Some(user) = self.users.get(&user_id).cloned() {
671            return Task::ready(Ok(user));
672        }
673
674        let load_users = self.get_users(vec![user_id], cx);
675        cx.spawn(async move |this, cx| {
676            load_users.await?;
677            this.read_with(cx, |this, _| {
678                this.users
679                    .get(&user_id)
680                    .cloned()
681                    .context("server responded with no users")
682            })?
683        })
684    }
685
686    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
687        self.by_github_login
688            .get(github_login)
689            .and_then(|id| self.users.get(id).cloned())
690    }
691
692    pub fn current_user(&self) -> Option<Arc<User>> {
693        self.current_user.borrow().clone()
694    }
695
696    pub fn plan(&self) -> Option<Plan> {
697        #[cfg(debug_assertions)]
698        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
699            use cloud_llm_client::PlanV1;
700
701            return match plan.as_str() {
702                "free" => Some(Plan::V1(PlanV1::ZedFree)),
703                "trial" => Some(Plan::V1(PlanV1::ZedProTrial)),
704                "pro" => Some(Plan::V1(PlanV1::ZedPro)),
705                _ => {
706                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
707                }
708            };
709        }
710
711        self.plan_info.as_ref().map(|info| info.plan())
712    }
713
714    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
715        self.plan_info
716            .as_ref()
717            .and_then(|plan| plan.subscription_period)
718            .map(|subscription_period| {
719                (
720                    subscription_period.started_at.0,
721                    subscription_period.ended_at.0,
722                )
723            })
724    }
725
726    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
727        self.plan_info
728            .as_ref()
729            .and_then(|plan| plan.trial_started_at)
730            .map(|trial_started_at| trial_started_at.0)
731    }
732
733    /// Returns whether the user's account is too new to use the service.
734    pub fn account_too_young(&self) -> bool {
735        self.plan_info
736            .as_ref()
737            .map(|plan| plan.is_account_too_young)
738            .unwrap_or_default()
739    }
740
741    /// Returns whether the current user has overdue invoices and usage should be blocked.
742    pub fn has_overdue_invoices(&self) -> bool {
743        self.plan_info
744            .as_ref()
745            .map(|plan| plan.has_overdue_invoices)
746            .unwrap_or_default()
747    }
748
749    pub fn is_usage_based_billing_enabled(&self) -> bool {
750        self.plan_info
751            .as_ref()
752            .map(|plan| plan.is_usage_based_billing_enabled)
753            .unwrap_or_default()
754    }
755
756    pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
757        self.model_request_usage
758    }
759
760    pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
761        self.model_request_usage = Some(usage);
762        cx.notify();
763    }
764
765    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
766        self.edit_prediction_usage
767    }
768
769    pub fn update_edit_prediction_usage(
770        &mut self,
771        usage: EditPredictionUsage,
772        cx: &mut Context<Self>,
773    ) {
774        self.edit_prediction_usage = Some(usage);
775        cx.notify();
776    }
777
778    fn update_authenticated_user(
779        &mut self,
780        response: GetAuthenticatedUserResponse,
781        cx: &mut Context<Self>,
782    ) {
783        let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
784        cx.update_flags(staff, response.feature_flags);
785        if let Some(client) = self.client.upgrade() {
786            client
787                .telemetry
788                .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
789        }
790
791        self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
792            limit: response.plan.usage.model_requests.limit,
793            amount: response.plan.usage.model_requests.used as i32,
794        }));
795        self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
796            limit: response.plan.usage.edit_predictions.limit,
797            amount: response.plan.usage.edit_predictions.used as i32,
798        }));
799        self.plan_info = Some(response.plan);
800        cx.emit(Event::PrivateUserInfoUpdated);
801    }
802
803    fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
804        cx.spawn(async move |cx| {
805            match message {
806                MessageToClient::UserUpdated => {
807                    let cloud_client = cx
808                        .update(|cx| {
809                            this.read_with(cx, |this, _cx| {
810                                this.client.upgrade().map(|client| client.cloud_client())
811                            })
812                        })??
813                        .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
814
815                    let response = cloud_client.get_authenticated_user().await?;
816                    cx.update(|cx| {
817                        this.update(cx, |this, cx| {
818                            this.update_authenticated_user(response, cx);
819                        })
820                    })??;
821                }
822            }
823
824            anyhow::Ok(())
825        })
826        .detach_and_log_err(cx);
827    }
828
829    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
830        self.current_user.clone()
831    }
832
833    fn load_users(
834        &self,
835        request: impl RequestMessage<Response = UsersResponse>,
836        cx: &Context<Self>,
837    ) -> Task<Result<Vec<Arc<User>>>> {
838        let client = self.client.clone();
839        cx.spawn(async move |this, cx| {
840            if let Some(rpc) = client.upgrade() {
841                let response = rpc.request(request).await.context("error loading users")?;
842                let users = response.users;
843
844                this.update(cx, |this, _| this.insert(users))
845            } else {
846                Ok(Vec::new())
847            }
848        })
849    }
850
851    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
852        let mut ret = Vec::with_capacity(users.len());
853        for user in users {
854            let user = User::new(user);
855            if let Some(old) = self.users.insert(user.id, user.clone())
856                && old.github_login != user.github_login
857            {
858                self.by_github_login.remove(&old.github_login);
859            }
860            self.by_github_login
861                .insert(user.github_login.clone(), user.id);
862            ret.push(user)
863        }
864        ret
865    }
866
867    pub fn set_participant_indices(
868        &mut self,
869        participant_indices: HashMap<u64, ParticipantIndex>,
870        cx: &mut Context<Self>,
871    ) {
872        if participant_indices != self.participant_indices {
873            self.participant_indices = participant_indices;
874            cx.emit(Event::ParticipantIndicesChanged);
875        }
876    }
877
878    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
879        &self.participant_indices
880    }
881
882    pub fn participant_names(
883        &self,
884        user_ids: impl Iterator<Item = u64>,
885        cx: &App,
886    ) -> HashMap<u64, SharedString> {
887        let mut ret = HashMap::default();
888        let mut missing_user_ids = Vec::new();
889        for id in user_ids {
890            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
891                ret.insert(id, github_login);
892            } else {
893                missing_user_ids.push(id)
894            }
895        }
896        if !missing_user_ids.is_empty() {
897            let this = self.weak_self.clone();
898            cx.spawn(async move |cx| {
899                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
900                    .await
901            })
902            .detach_and_log_err(cx);
903        }
904        ret
905    }
906}
907
908impl User {
909    fn new(message: proto::User) -> Arc<Self> {
910        Arc::new(User {
911            id: message.id,
912            github_login: message.github_login.into(),
913            avatar_uri: message.avatar_url.into(),
914            name: message.name,
915        })
916    }
917}
918
919impl Contact {
920    async fn from_proto(
921        contact: proto::Contact,
922        user_store: &Entity<UserStore>,
923        cx: &mut AsyncApp,
924    ) -> Result<Self> {
925        let user = user_store
926            .update(cx, |user_store, cx| {
927                user_store.get_user(contact.user_id, cx)
928            })?
929            .await?;
930        Ok(Self {
931            user,
932            online: contact.online,
933            busy: contact.busy,
934        })
935    }
936}
937
938impl Collaborator {
939    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
940        Ok(Self {
941            peer_id: message.peer_id.context("invalid peer id")?,
942            replica_id: message.replica_id as ReplicaId,
943            user_id: message.user_id as UserId,
944            is_host: message.is_host,
945            committer_name: message.committer_name,
946            committer_email: message.committer_email,
947        })
948    }
949}
950
951impl RequestUsage {
952    pub fn over_limit(&self) -> bool {
953        match self.limit {
954            UsageLimit::Limited(limit) => self.amount >= limit,
955            UsageLimit::Unlimited => false,
956        }
957    }
958
959    fn from_headers(
960        limit_name: &str,
961        amount_name: &str,
962        headers: &HeaderMap<HeaderValue>,
963    ) -> Result<Self> {
964        let limit = headers
965            .get(limit_name)
966            .with_context(|| format!("missing {limit_name:?} header"))?;
967        let limit = UsageLimit::from_str(limit.to_str()?)?;
968
969        let amount = headers
970            .get(amount_name)
971            .with_context(|| format!("missing {amount_name:?} header"))?;
972        let amount = amount.to_str()?.parse::<i32>()?;
973
974        Ok(Self { limit, amount })
975    }
976}
977
978impl ModelRequestUsage {
979    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
980        Ok(Self(RequestUsage::from_headers(
981            MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
982            MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
983            headers,
984        )?))
985    }
986}
987
988impl EditPredictionUsage {
989    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
990        Ok(Self(RequestUsage::from_headers(
991            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
992            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
993            headers,
994        )?))
995    }
996}