user.rs

  1use super::{Client, Status, TypedEnvelope, proto};
  2use anyhow::{Context as _, Result};
  3use chrono::{DateTime, Utc};
  4use cloud_api_client::websocket_protocol::MessageToClient;
  5use cloud_api_client::{
  6    GetAuthenticatedUserResponse, Organization, OrganizationId, Plan, PlanInfo,
  7};
  8use cloud_llm_client::{
  9    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
 10};
 11use collections::{HashMap, HashSet, hash_map::Entry};
 12use derive_more::Deref;
 13use feature_flags::FeatureFlagAppExt;
 14use futures::{Future, StreamExt, channel::mpsc};
 15use gpui::{
 16    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
 17};
 18use http_client::http::{HeaderMap, HeaderValue};
 19use postage::{sink::Sink, watch};
 20use rpc::proto::{RequestMessage, UsersResponse};
 21use std::{
 22    str::FromStr as _,
 23    sync::{Arc, Weak},
 24};
 25use text::ReplicaId;
 26use util::{ResultExt, TryFutureExt as _};
 27
 28pub type UserId = u64;
 29
 30#[derive(
 31    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 32)]
 33pub struct ChannelId(pub u64);
 34
 35impl std::fmt::Display for ChannelId {
 36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 37        self.0.fmt(f)
 38    }
 39}
 40
 41#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 42pub struct ProjectId(pub u64);
 43
 44impl ProjectId {
 45    pub fn to_proto(self) -> u64 {
 46        self.0
 47    }
 48}
 49
 50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 51pub struct ParticipantIndex(pub u32);
 52
 53#[derive(Default, Debug)]
 54pub struct User {
 55    pub id: UserId,
 56    pub github_login: SharedString,
 57    pub avatar_uri: SharedUri,
 58    pub name: Option<String>,
 59}
 60
 61#[derive(Clone, Debug, PartialEq, Eq)]
 62pub struct Collaborator {
 63    pub peer_id: proto::PeerId,
 64    pub replica_id: ReplicaId,
 65    pub user_id: UserId,
 66    pub is_host: bool,
 67    pub committer_name: Option<String>,
 68    pub committer_email: Option<String>,
 69}
 70
 71impl PartialOrd for User {
 72    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 73        Some(self.cmp(other))
 74    }
 75}
 76
 77impl Ord for User {
 78    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 79        self.github_login.cmp(&other.github_login)
 80    }
 81}
 82
 83impl PartialEq for User {
 84    fn eq(&self, other: &Self) -> bool {
 85        self.id == other.id && self.github_login == other.github_login
 86    }
 87}
 88
 89impl Eq for User {}
 90
 91#[derive(Debug, PartialEq)]
 92pub struct Contact {
 93    pub user: Arc<User>,
 94    pub online: bool,
 95    pub busy: bool,
 96}
 97
 98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 99pub enum ContactRequestStatus {
100    None,
101    RequestSent,
102    RequestReceived,
103    RequestAccepted,
104}
105
106pub struct UserStore {
107    users: HashMap<u64, Arc<User>>,
108    by_github_login: HashMap<SharedString, u64>,
109    participant_indices: HashMap<u64, ParticipantIndex>,
110    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
111    edit_prediction_usage: Option<EditPredictionUsage>,
112    plan_info: Option<PlanInfo>,
113    current_user: watch::Receiver<Option<Arc<User>>>,
114    current_organization: Option<Arc<Organization>>,
115    organizations: Vec<Arc<Organization>>,
116    plans_by_organization: HashMap<OrganizationId, Plan>,
117    contacts: Vec<Arc<Contact>>,
118    incoming_contact_requests: Vec<Arc<User>>,
119    outgoing_contact_requests: Vec<Arc<User>>,
120    pending_contact_requests: HashMap<u64, usize>,
121    client: Weak<Client>,
122    _maintain_contacts: Task<()>,
123    _maintain_current_user: Task<Result<()>>,
124    _handle_sign_out: Task<()>,
125    weak_self: WeakEntity<Self>,
126}
127
128#[derive(Clone)]
129pub struct InviteInfo {
130    pub count: u32,
131    pub url: Arc<str>,
132}
133
134pub enum Event {
135    Contact {
136        user: Arc<User>,
137        kind: ContactEventKind,
138    },
139    ShowContacts,
140    ParticipantIndicesChanged,
141    PrivateUserInfoUpdated,
142    PlanUpdated,
143}
144
145#[derive(Clone, Copy)]
146pub enum ContactEventKind {
147    Requested,
148    Accepted,
149    Cancelled,
150}
151
152impl EventEmitter<Event> for UserStore {}
153
154enum UpdateContacts {
155    Update(proto::UpdateContacts),
156    Wait(postage::barrier::Sender),
157    Clear(postage::barrier::Sender),
158}
159
160#[derive(Debug, Clone, Copy, Deref)]
161pub struct EditPredictionUsage(pub RequestUsage);
162
163#[derive(Debug, Clone, Copy)]
164pub struct RequestUsage {
165    pub limit: UsageLimit,
166    pub amount: i32,
167}
168
169impl UserStore {
170    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
171        let (mut current_user_tx, current_user_rx) = watch::channel();
172        let (sign_out_tx, mut sign_out_rx) = mpsc::unbounded();
173        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
174        let rpc_subscriptions = vec![
175            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
176            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
177        ];
178
179        client.sign_out_tx.lock().replace(sign_out_tx);
180        client.add_message_to_client_handler({
181            let this = cx.weak_entity();
182            move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
183        });
184
185        Self {
186            users: Default::default(),
187            by_github_login: Default::default(),
188            current_user: current_user_rx,
189            current_organization: None,
190            organizations: Vec::new(),
191            plans_by_organization: HashMap::default(),
192            plan_info: None,
193            edit_prediction_usage: None,
194            contacts: Default::default(),
195            incoming_contact_requests: Default::default(),
196            participant_indices: Default::default(),
197            outgoing_contact_requests: Default::default(),
198            client: Arc::downgrade(&client),
199            update_contacts_tx,
200            _maintain_contacts: cx.spawn(async move |this, cx| {
201                let _subscriptions = rpc_subscriptions;
202                while let Some(message) = update_contacts_rx.next().await {
203                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
204                    {
205                        task.log_err().await;
206                    } else {
207                        break;
208                    }
209                }
210            }),
211            _maintain_current_user: cx.spawn(async move |this, cx| {
212                let mut status = client.status();
213                let weak = Arc::downgrade(&client);
214                drop(client);
215                while let Some(status) = status.next().await {
216                    // if the client is dropped, the app is shutting down.
217                    let Some(client) = weak.upgrade() else {
218                        return Ok(());
219                    };
220                    match status {
221                        Status::Authenticated
222                        | Status::Reauthenticated
223                        | Status::Connected { .. } => {
224                            if let Some(user_id) = client.user_id() {
225                                let response = client
226                                    .cloud_client()
227                                    .get_authenticated_user()
228                                    .await
229                                    .log_err();
230
231                                let current_user_and_response = if let Some(response) = response {
232                                    let user = Arc::new(User {
233                                        id: user_id,
234                                        github_login: response.user.github_login.clone().into(),
235                                        avatar_uri: response.user.avatar_url.clone().into(),
236                                        name: response.user.name.clone(),
237                                    });
238
239                                    Some((user, response))
240                                } else {
241                                    None
242                                };
243                                current_user_tx
244                                    .send(
245                                        current_user_and_response
246                                            .as_ref()
247                                            .map(|(user, _)| user.clone()),
248                                    )
249                                    .await
250                                    .ok();
251
252                                cx.update(|cx| {
253                                    if let Some((user, response)) = current_user_and_response {
254                                        this.update(cx, |this, cx| {
255                                            this.by_github_login
256                                                .insert(user.github_login.clone(), user_id);
257                                            this.users.insert(user_id, user);
258                                            this.update_authenticated_user(response, cx)
259                                        })
260                                    } else {
261                                        anyhow::Ok(())
262                                    }
263                                })?;
264
265                                this.update(cx, |_, cx| cx.notify())?;
266                            }
267                        }
268                        Status::SignedOut => {
269                            current_user_tx.send(None).await.ok();
270                            this.update(cx, |this, cx| {
271                                this.clear_organizations();
272                                this.clear_plan_and_usage();
273                                cx.emit(Event::PrivateUserInfoUpdated);
274                                cx.notify();
275                                this.clear_contacts()
276                            })?
277                            .await;
278                        }
279                        Status::ConnectionLost => {
280                            this.update(cx, |this, cx| {
281                                cx.notify();
282                                this.clear_contacts()
283                            })?
284                            .await;
285                        }
286                        _ => {}
287                    }
288                }
289                Ok(())
290            }),
291            _handle_sign_out: cx.spawn(async move |this, cx| {
292                while let Some(()) = sign_out_rx.next().await {
293                    let Some(client) = this
294                        .read_with(cx, |this, _cx| this.client.upgrade())
295                        .ok()
296                        .flatten()
297                    else {
298                        break;
299                    };
300
301                    client.sign_out(cx).await;
302                }
303            }),
304            pending_contact_requests: Default::default(),
305            weak_self: cx.weak_entity(),
306        }
307    }
308
309    #[cfg(feature = "test-support")]
310    pub fn clear_cache(&mut self) {
311        self.users.clear();
312        self.by_github_login.clear();
313    }
314
315    async fn handle_show_contacts(
316        this: Entity<Self>,
317        _: TypedEnvelope<proto::ShowContacts>,
318        mut cx: AsyncApp,
319    ) -> Result<()> {
320        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
321        Ok(())
322    }
323
324    async fn handle_update_contacts(
325        this: Entity<Self>,
326        message: TypedEnvelope<proto::UpdateContacts>,
327        cx: AsyncApp,
328    ) -> Result<()> {
329        this.read_with(&cx, |this, _| {
330            this.update_contacts_tx
331                .unbounded_send(UpdateContacts::Update(message.payload))
332                .unwrap();
333        });
334        Ok(())
335    }
336
337    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
338        match message {
339            UpdateContacts::Wait(barrier) => {
340                drop(barrier);
341                Task::ready(Ok(()))
342            }
343            UpdateContacts::Clear(barrier) => {
344                self.contacts.clear();
345                self.incoming_contact_requests.clear();
346                self.outgoing_contact_requests.clear();
347                drop(barrier);
348                Task::ready(Ok(()))
349            }
350            UpdateContacts::Update(message) => {
351                let mut user_ids = HashSet::default();
352                for contact in &message.contacts {
353                    user_ids.insert(contact.user_id);
354                }
355                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
356                user_ids.extend(message.outgoing_requests.iter());
357
358                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
359                cx.spawn(async move |this, cx| {
360                    load_users.await?;
361
362                    // Users are fetched in parallel above and cached in call to get_users
363                    // No need to parallelize here
364                    let mut updated_contacts = Vec::new();
365                    let this = this.upgrade().context("can't upgrade user store handle")?;
366                    for contact in message.contacts {
367                        updated_contacts
368                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
369                    }
370
371                    let mut incoming_requests = Vec::new();
372                    for request in message.incoming_requests {
373                        incoming_requests.push({
374                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
375                                .await?
376                        });
377                    }
378
379                    let mut outgoing_requests = Vec::new();
380                    for requested_user_id in message.outgoing_requests {
381                        outgoing_requests.push(
382                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
383                                .await?,
384                        );
385                    }
386
387                    let removed_contacts =
388                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
389                    let removed_incoming_requests =
390                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
391                    let removed_outgoing_requests =
392                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
393
394                    this.update(cx, |this, cx| {
395                        // Remove contacts
396                        this.contacts
397                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
398                        // Update existing contacts and insert new ones
399                        for updated_contact in updated_contacts {
400                            match this.contacts.binary_search_by_key(
401                                &&updated_contact.user.github_login,
402                                |contact| &contact.user.github_login,
403                            ) {
404                                Ok(ix) => this.contacts[ix] = updated_contact,
405                                Err(ix) => this.contacts.insert(ix, updated_contact),
406                            }
407                        }
408
409                        // Remove incoming contact requests
410                        this.incoming_contact_requests.retain(|user| {
411                            if removed_incoming_requests.contains(&user.id) {
412                                cx.emit(Event::Contact {
413                                    user: user.clone(),
414                                    kind: ContactEventKind::Cancelled,
415                                });
416                                false
417                            } else {
418                                true
419                            }
420                        });
421                        // Update existing incoming requests and insert new ones
422                        for user in incoming_requests {
423                            match this
424                                .incoming_contact_requests
425                                .binary_search_by_key(&&user.github_login, |contact| {
426                                    &contact.github_login
427                                }) {
428                                Ok(ix) => this.incoming_contact_requests[ix] = user,
429                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
430                            }
431                        }
432
433                        // Remove outgoing contact requests
434                        this.outgoing_contact_requests
435                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
436                        // Update existing incoming requests and insert new ones
437                        for request in outgoing_requests {
438                            match this
439                                .outgoing_contact_requests
440                                .binary_search_by_key(&&request.github_login, |contact| {
441                                    &contact.github_login
442                                }) {
443                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
444                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
445                            }
446                        }
447
448                        cx.notify();
449                    });
450
451                    Ok(())
452                })
453            }
454        }
455    }
456
457    pub fn contacts(&self) -> &[Arc<Contact>] {
458        &self.contacts
459    }
460
461    pub fn has_contact(&self, user: &Arc<User>) -> bool {
462        self.contacts
463            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
464            .is_ok()
465    }
466
467    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
468        &self.incoming_contact_requests
469    }
470
471    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
472        &self.outgoing_contact_requests
473    }
474
475    pub fn is_contact_request_pending(&self, user: &User) -> bool {
476        self.pending_contact_requests.contains_key(&user.id)
477    }
478
479    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
480        if self
481            .contacts
482            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
483            .is_ok()
484        {
485            ContactRequestStatus::RequestAccepted
486        } else if self
487            .outgoing_contact_requests
488            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
489            .is_ok()
490        {
491            ContactRequestStatus::RequestSent
492        } else if self
493            .incoming_contact_requests
494            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
495            .is_ok()
496        {
497            ContactRequestStatus::RequestReceived
498        } else {
499            ContactRequestStatus::None
500        }
501    }
502
503    pub fn request_contact(
504        &mut self,
505        responder_id: u64,
506        cx: &mut Context<Self>,
507    ) -> Task<Result<()>> {
508        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
509    }
510
511    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
512        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
513    }
514
515    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
516        self.incoming_contact_requests
517            .iter()
518            .any(|user| user.id == user_id)
519    }
520
521    pub fn respond_to_contact_request(
522        &mut self,
523        requester_id: u64,
524        accept: bool,
525        cx: &mut Context<Self>,
526    ) -> Task<Result<()>> {
527        self.perform_contact_request(
528            requester_id,
529            proto::RespondToContactRequest {
530                requester_id,
531                response: if accept {
532                    proto::ContactRequestResponse::Accept
533                } else {
534                    proto::ContactRequestResponse::Decline
535                } as i32,
536            },
537            cx,
538        )
539    }
540
541    pub fn dismiss_contact_request(
542        &self,
543        requester_id: u64,
544        cx: &Context<Self>,
545    ) -> Task<Result<()>> {
546        let client = self.client.upgrade();
547        cx.spawn(async move |_, _| {
548            client
549                .context("can't upgrade client reference")?
550                .request(proto::RespondToContactRequest {
551                    requester_id,
552                    response: proto::ContactRequestResponse::Dismiss as i32,
553                })
554                .await?;
555            Ok(())
556        })
557    }
558
559    fn perform_contact_request<T: RequestMessage>(
560        &mut self,
561        user_id: u64,
562        request: T,
563        cx: &mut Context<Self>,
564    ) -> Task<Result<()>> {
565        let client = self.client.upgrade();
566        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
567        cx.notify();
568
569        cx.spawn(async move |this, cx| {
570            let response = client
571                .context("can't upgrade client reference")?
572                .request(request)
573                .await;
574            this.update(cx, |this, cx| {
575                if let Entry::Occupied(mut request_count) =
576                    this.pending_contact_requests.entry(user_id)
577                {
578                    *request_count.get_mut() -= 1;
579                    if *request_count.get() == 0 {
580                        request_count.remove();
581                    }
582                }
583                cx.notify();
584            })?;
585            response?;
586            Ok(())
587        })
588    }
589
590    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
591        let (tx, mut rx) = postage::barrier::channel();
592        self.update_contacts_tx
593            .unbounded_send(UpdateContacts::Clear(tx))
594            .unwrap();
595        async move {
596            rx.next().await;
597        }
598    }
599
600    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
601        let (tx, mut rx) = postage::barrier::channel();
602        self.update_contacts_tx
603            .unbounded_send(UpdateContacts::Wait(tx))
604            .unwrap();
605        async move {
606            rx.next().await;
607        }
608    }
609
610    pub fn get_users(
611        &self,
612        user_ids: Vec<u64>,
613        cx: &Context<Self>,
614    ) -> Task<Result<Vec<Arc<User>>>> {
615        let mut user_ids_to_fetch = user_ids.clone();
616        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
617
618        cx.spawn(async move |this, cx| {
619            if !user_ids_to_fetch.is_empty() {
620                this.update(cx, |this, cx| {
621                    this.load_users(
622                        proto::GetUsers {
623                            user_ids: user_ids_to_fetch,
624                        },
625                        cx,
626                    )
627                })?
628                .await?;
629            }
630
631            this.read_with(cx, |this, _| {
632                user_ids
633                    .iter()
634                    .map(|user_id| {
635                        this.users
636                            .get(user_id)
637                            .cloned()
638                            .with_context(|| format!("user {user_id} not found"))
639                    })
640                    .collect()
641            })?
642        })
643    }
644
645    pub fn fuzzy_search_users(
646        &self,
647        query: String,
648        cx: &Context<Self>,
649    ) -> Task<Result<Vec<Arc<User>>>> {
650        self.load_users(proto::FuzzySearchUsers { query }, cx)
651    }
652
653    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
654        self.users.get(&user_id).cloned()
655    }
656
657    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
658        if let Some(user) = self.users.get(&user_id).cloned() {
659            return Some(user);
660        }
661
662        self.get_user(user_id, cx).detach_and_log_err(cx);
663        None
664    }
665
666    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
667        if let Some(user) = self.users.get(&user_id).cloned() {
668            return Task::ready(Ok(user));
669        }
670
671        let load_users = self.get_users(vec![user_id], cx);
672        cx.spawn(async move |this, cx| {
673            load_users.await?;
674            this.read_with(cx, |this, _| {
675                this.users
676                    .get(&user_id)
677                    .cloned()
678                    .context("server responded with no users")
679            })?
680        })
681    }
682
683    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
684        self.by_github_login
685            .get(github_login)
686            .and_then(|id| self.users.get(id).cloned())
687    }
688
689    pub fn current_user(&self) -> Option<Arc<User>> {
690        self.current_user.borrow().clone()
691    }
692
693    pub fn current_organization(&self) -> Option<Arc<Organization>> {
694        self.current_organization.clone()
695    }
696
697    pub fn set_current_organization(&mut self, organization: Arc<Organization>) {
698        self.current_organization.replace(organization);
699    }
700
701    pub fn organizations(&self) -> &Vec<Arc<Organization>> {
702        &self.organizations
703    }
704
705    pub fn plan_for_organization(&self, organization_id: &OrganizationId) -> Option<Plan> {
706        self.plans_by_organization.get(organization_id).copied()
707    }
708
709    pub fn plan(&self) -> Option<Plan> {
710        #[cfg(debug_assertions)]
711        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
712            use cloud_api_client::Plan;
713
714            return match plan.as_str() {
715                "free" => Some(Plan::ZedFree),
716                "trial" => Some(Plan::ZedProTrial),
717                "pro" => Some(Plan::ZedPro),
718                _ => {
719                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
720                }
721            };
722        }
723
724        if let Some(organization) = &self.current_organization
725            && let Some(plan) = self.plan_for_organization(&organization.id)
726        {
727            return Some(plan);
728        }
729
730        self.plan_info.as_ref().map(|info| info.plan())
731    }
732
733    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
734        self.plan_info
735            .as_ref()
736            .and_then(|plan| plan.subscription_period)
737            .map(|subscription_period| {
738                (
739                    subscription_period.started_at.0,
740                    subscription_period.ended_at.0,
741                )
742            })
743    }
744
745    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
746        self.plan_info
747            .as_ref()
748            .and_then(|plan| plan.trial_started_at)
749            .map(|trial_started_at| trial_started_at.0)
750    }
751
752    /// Returns whether the user's account is too new to use the service.
753    pub fn account_too_young(&self) -> bool {
754        self.plan_info
755            .as_ref()
756            .map(|plan| plan.is_account_too_young)
757            .unwrap_or_default()
758    }
759
760    /// Returns whether the current user has overdue invoices and usage should be blocked.
761    pub fn has_overdue_invoices(&self) -> bool {
762        self.plan_info
763            .as_ref()
764            .map(|plan| plan.has_overdue_invoices)
765            .unwrap_or_default()
766    }
767
768    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
769        self.edit_prediction_usage
770    }
771
772    pub fn update_edit_prediction_usage(
773        &mut self,
774        usage: EditPredictionUsage,
775        cx: &mut Context<Self>,
776    ) {
777        self.edit_prediction_usage = Some(usage);
778        cx.notify();
779    }
780
781    pub fn clear_organizations(&mut self) {
782        self.organizations.clear();
783        self.current_organization = None;
784    }
785
786    pub fn clear_plan_and_usage(&mut self) {
787        self.plan_info = None;
788        self.edit_prediction_usage = None;
789    }
790
791    fn update_authenticated_user(
792        &mut self,
793        response: GetAuthenticatedUserResponse,
794        cx: &mut Context<Self>,
795    ) {
796        let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
797        cx.update_flags(staff, response.feature_flags);
798        if let Some(client) = self.client.upgrade() {
799            client
800                .telemetry
801                .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
802        }
803
804        self.organizations = response.organizations.into_iter().map(Arc::new).collect();
805        self.current_organization = self.organizations.first().cloned();
806
807        self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
808            limit: response.plan.usage.edit_predictions.limit,
809            amount: response.plan.usage.edit_predictions.used as i32,
810        }));
811        self.plan_info = Some(response.plan);
812        cx.emit(Event::PrivateUserInfoUpdated);
813    }
814
815    fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
816        cx.spawn(async move |cx| {
817            match message {
818                MessageToClient::UserUpdated => {
819                    let cloud_client = cx
820                        .update(|cx| {
821                            this.read_with(cx, |this, _cx| {
822                                this.client.upgrade().map(|client| client.cloud_client())
823                            })
824                        })?
825                        .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
826
827                    let response = cloud_client.get_authenticated_user().await?;
828                    cx.update(|cx| {
829                        this.update(cx, |this, cx| {
830                            this.update_authenticated_user(response, cx);
831                        })
832                    })?;
833                }
834            }
835
836            anyhow::Ok(())
837        })
838        .detach_and_log_err(cx);
839    }
840
841    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
842        self.current_user.clone()
843    }
844
845    fn load_users(
846        &self,
847        request: impl RequestMessage<Response = UsersResponse>,
848        cx: &Context<Self>,
849    ) -> Task<Result<Vec<Arc<User>>>> {
850        let client = self.client.clone();
851        cx.spawn(async move |this, cx| {
852            if let Some(rpc) = client.upgrade() {
853                let response = rpc.request(request).await.context("error loading users")?;
854                let users = response.users;
855
856                this.update(cx, |this, _| this.insert(users))
857            } else {
858                Ok(Vec::new())
859            }
860        })
861    }
862
863    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
864        let mut ret = Vec::with_capacity(users.len());
865        for user in users {
866            let user = User::new(user);
867            if let Some(old) = self.users.insert(user.id, user.clone())
868                && old.github_login != user.github_login
869            {
870                self.by_github_login.remove(&old.github_login);
871            }
872            self.by_github_login
873                .insert(user.github_login.clone(), user.id);
874            ret.push(user)
875        }
876        ret
877    }
878
879    pub fn set_participant_indices(
880        &mut self,
881        participant_indices: HashMap<u64, ParticipantIndex>,
882        cx: &mut Context<Self>,
883    ) {
884        if participant_indices != self.participant_indices {
885            self.participant_indices = participant_indices;
886            cx.emit(Event::ParticipantIndicesChanged);
887        }
888    }
889
890    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
891        &self.participant_indices
892    }
893
894    pub fn participant_names(
895        &self,
896        user_ids: impl Iterator<Item = u64>,
897        cx: &App,
898    ) -> HashMap<u64, SharedString> {
899        let mut ret = HashMap::default();
900        let mut missing_user_ids = Vec::new();
901        for id in user_ids {
902            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
903                ret.insert(id, github_login);
904            } else {
905                missing_user_ids.push(id)
906            }
907        }
908        if !missing_user_ids.is_empty() {
909            let this = self.weak_self.clone();
910            cx.spawn(async move |cx| {
911                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
912                    .await
913            })
914            .detach_and_log_err(cx);
915        }
916        ret
917    }
918}
919
920impl User {
921    fn new(message: proto::User) -> Arc<Self> {
922        Arc::new(User {
923            id: message.id,
924            github_login: message.github_login.into(),
925            avatar_uri: message.avatar_url.into(),
926            name: message.name,
927        })
928    }
929}
930
931impl Contact {
932    async fn from_proto(
933        contact: proto::Contact,
934        user_store: &Entity<UserStore>,
935        cx: &mut AsyncApp,
936    ) -> Result<Self> {
937        let user = user_store
938            .update(cx, |user_store, cx| {
939                user_store.get_user(contact.user_id, cx)
940            })
941            .await?;
942        Ok(Self {
943            user,
944            online: contact.online,
945            busy: contact.busy,
946        })
947    }
948}
949
950impl Collaborator {
951    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
952        Ok(Self {
953            peer_id: message.peer_id.context("invalid peer id")?,
954            replica_id: ReplicaId::new(message.replica_id as u16),
955            user_id: message.user_id as UserId,
956            is_host: message.is_host,
957            committer_name: message.committer_name,
958            committer_email: message.committer_email,
959        })
960    }
961}
962
963impl RequestUsage {
964    pub fn over_limit(&self) -> bool {
965        match self.limit {
966            UsageLimit::Limited(limit) => self.amount >= limit,
967            UsageLimit::Unlimited => false,
968        }
969    }
970
971    fn from_headers(
972        limit_name: &str,
973        amount_name: &str,
974        headers: &HeaderMap<HeaderValue>,
975    ) -> Result<Self> {
976        let limit = headers
977            .get(limit_name)
978            .with_context(|| format!("missing {limit_name:?} header"))?;
979        let limit = UsageLimit::from_str(limit.to_str()?)?;
980
981        let amount = headers
982            .get(amount_name)
983            .with_context(|| format!("missing {amount_name:?} header"))?;
984        let amount = amount.to_str()?.parse::<i32>()?;
985
986        Ok(Self { limit, amount })
987    }
988}
989
990impl EditPredictionUsage {
991    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
992        Ok(Self(RequestUsage::from_headers(
993            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
994            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
995            headers,
996        )?))
997    }
998}