user.rs

  1use super::{Client, Status, TypedEnvelope, proto};
  2use anyhow::{Context as _, Result};
  3use chrono::{DateTime, Utc};
  4use cloud_api_client::websocket_protocol::MessageToClient;
  5use cloud_api_client::{GetAuthenticatedUserResponse, Organization, Plan, PlanInfo};
  6use cloud_llm_client::{
  7    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
  8};
  9use collections::{HashMap, HashSet, hash_map::Entry};
 10use derive_more::Deref;
 11use feature_flags::FeatureFlagAppExt;
 12use futures::{Future, StreamExt, channel::mpsc};
 13use gpui::{
 14    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
 15};
 16use http_client::http::{HeaderMap, HeaderValue};
 17use postage::{sink::Sink, watch};
 18use rpc::proto::{RequestMessage, UsersResponse};
 19use std::{
 20    str::FromStr as _,
 21    sync::{Arc, Weak},
 22};
 23use text::ReplicaId;
 24use util::{ResultExt, TryFutureExt as _};
 25
 26pub type UserId = u64;
 27
 28#[derive(
 29    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 30)]
 31pub struct ChannelId(pub u64);
 32
 33impl std::fmt::Display for ChannelId {
 34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 35        self.0.fmt(f)
 36    }
 37}
 38
 39#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 40pub struct ProjectId(pub u64);
 41
 42impl ProjectId {
 43    pub fn to_proto(self) -> u64 {
 44        self.0
 45    }
 46}
 47
 48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 49pub struct ParticipantIndex(pub u32);
 50
 51#[derive(Default, Debug)]
 52pub struct User {
 53    pub id: UserId,
 54    pub github_login: SharedString,
 55    pub avatar_uri: SharedUri,
 56    pub name: Option<String>,
 57}
 58
 59#[derive(Clone, Debug, PartialEq, Eq)]
 60pub struct Collaborator {
 61    pub peer_id: proto::PeerId,
 62    pub replica_id: ReplicaId,
 63    pub user_id: UserId,
 64    pub is_host: bool,
 65    pub committer_name: Option<String>,
 66    pub committer_email: Option<String>,
 67}
 68
 69impl PartialOrd for User {
 70    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 71        Some(self.cmp(other))
 72    }
 73}
 74
 75impl Ord for User {
 76    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 77        self.github_login.cmp(&other.github_login)
 78    }
 79}
 80
 81impl PartialEq for User {
 82    fn eq(&self, other: &Self) -> bool {
 83        self.id == other.id && self.github_login == other.github_login
 84    }
 85}
 86
 87impl Eq for User {}
 88
 89#[derive(Debug, PartialEq)]
 90pub struct Contact {
 91    pub user: Arc<User>,
 92    pub online: bool,
 93    pub busy: bool,
 94}
 95
 96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 97pub enum ContactRequestStatus {
 98    None,
 99    RequestSent,
100    RequestReceived,
101    RequestAccepted,
102}
103
104pub struct UserStore {
105    users: HashMap<u64, Arc<User>>,
106    by_github_login: HashMap<SharedString, u64>,
107    participant_indices: HashMap<u64, ParticipantIndex>,
108    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
109    edit_prediction_usage: Option<EditPredictionUsage>,
110    plan_info: Option<PlanInfo>,
111    current_user: watch::Receiver<Option<Arc<User>>>,
112    current_organization: Option<Arc<Organization>>,
113    organizations: Vec<Arc<Organization>>,
114    contacts: Vec<Arc<Contact>>,
115    incoming_contact_requests: Vec<Arc<User>>,
116    outgoing_contact_requests: Vec<Arc<User>>,
117    pending_contact_requests: HashMap<u64, usize>,
118    client: Weak<Client>,
119    _maintain_contacts: Task<()>,
120    _maintain_current_user: Task<Result<()>>,
121    _handle_sign_out: Task<()>,
122    weak_self: WeakEntity<Self>,
123}
124
125#[derive(Clone)]
126pub struct InviteInfo {
127    pub count: u32,
128    pub url: Arc<str>,
129}
130
131pub enum Event {
132    Contact {
133        user: Arc<User>,
134        kind: ContactEventKind,
135    },
136    ShowContacts,
137    ParticipantIndicesChanged,
138    PrivateUserInfoUpdated,
139    PlanUpdated,
140}
141
142#[derive(Clone, Copy)]
143pub enum ContactEventKind {
144    Requested,
145    Accepted,
146    Cancelled,
147}
148
149impl EventEmitter<Event> for UserStore {}
150
151enum UpdateContacts {
152    Update(proto::UpdateContacts),
153    Wait(postage::barrier::Sender),
154    Clear(postage::barrier::Sender),
155}
156
157#[derive(Debug, Clone, Copy, Deref)]
158pub struct EditPredictionUsage(pub RequestUsage);
159
160#[derive(Debug, Clone, Copy)]
161pub struct RequestUsage {
162    pub limit: UsageLimit,
163    pub amount: i32,
164}
165
166impl UserStore {
167    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
168        let (mut current_user_tx, current_user_rx) = watch::channel();
169        let (sign_out_tx, mut sign_out_rx) = mpsc::unbounded();
170        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
171        let rpc_subscriptions = vec![
172            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
173            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
174        ];
175
176        client.sign_out_tx.lock().replace(sign_out_tx);
177        client.add_message_to_client_handler({
178            let this = cx.weak_entity();
179            move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
180        });
181
182        Self {
183            users: Default::default(),
184            by_github_login: Default::default(),
185            current_user: current_user_rx,
186            current_organization: None,
187            organizations: Vec::new(),
188            plan_info: None,
189            edit_prediction_usage: None,
190            contacts: Default::default(),
191            incoming_contact_requests: Default::default(),
192            participant_indices: Default::default(),
193            outgoing_contact_requests: Default::default(),
194            client: Arc::downgrade(&client),
195            update_contacts_tx,
196            _maintain_contacts: cx.spawn(async move |this, cx| {
197                let _subscriptions = rpc_subscriptions;
198                while let Some(message) = update_contacts_rx.next().await {
199                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
200                    {
201                        task.log_err().await;
202                    } else {
203                        break;
204                    }
205                }
206            }),
207            _maintain_current_user: cx.spawn(async move |this, cx| {
208                let mut status = client.status();
209                let weak = Arc::downgrade(&client);
210                drop(client);
211                while let Some(status) = status.next().await {
212                    // if the client is dropped, the app is shutting down.
213                    let Some(client) = weak.upgrade() else {
214                        return Ok(());
215                    };
216                    match status {
217                        Status::Authenticated
218                        | Status::Reauthenticated
219                        | Status::Connected { .. } => {
220                            if let Some(user_id) = client.user_id() {
221                                let response = client
222                                    .cloud_client()
223                                    .get_authenticated_user()
224                                    .await
225                                    .log_err();
226
227                                let current_user_and_response = if let Some(response) = response {
228                                    let user = Arc::new(User {
229                                        id: user_id,
230                                        github_login: response.user.github_login.clone().into(),
231                                        avatar_uri: response.user.avatar_url.clone().into(),
232                                        name: response.user.name.clone(),
233                                    });
234
235                                    Some((user, response))
236                                } else {
237                                    None
238                                };
239                                current_user_tx
240                                    .send(
241                                        current_user_and_response
242                                            .as_ref()
243                                            .map(|(user, _)| user.clone()),
244                                    )
245                                    .await
246                                    .ok();
247
248                                cx.update(|cx| {
249                                    if let Some((user, response)) = current_user_and_response {
250                                        this.update(cx, |this, cx| {
251                                            this.by_github_login
252                                                .insert(user.github_login.clone(), user_id);
253                                            this.users.insert(user_id, user);
254                                            this.update_authenticated_user(response, cx)
255                                        })
256                                    } else {
257                                        anyhow::Ok(())
258                                    }
259                                })?;
260
261                                this.update(cx, |_, cx| cx.notify())?;
262                            }
263                        }
264                        Status::SignedOut => {
265                            current_user_tx.send(None).await.ok();
266                            this.update(cx, |this, cx| {
267                                this.clear_organizations();
268                                this.clear_plan_and_usage();
269                                cx.emit(Event::PrivateUserInfoUpdated);
270                                cx.notify();
271                                this.clear_contacts()
272                            })?
273                            .await;
274                        }
275                        Status::ConnectionLost => {
276                            this.update(cx, |this, cx| {
277                                cx.notify();
278                                this.clear_contacts()
279                            })?
280                            .await;
281                        }
282                        _ => {}
283                    }
284                }
285                Ok(())
286            }),
287            _handle_sign_out: cx.spawn(async move |this, cx| {
288                while let Some(()) = sign_out_rx.next().await {
289                    let Some(client) = this
290                        .read_with(cx, |this, _cx| this.client.upgrade())
291                        .ok()
292                        .flatten()
293                    else {
294                        break;
295                    };
296
297                    client.sign_out(cx).await;
298                }
299            }),
300            pending_contact_requests: Default::default(),
301            weak_self: cx.weak_entity(),
302        }
303    }
304
305    #[cfg(feature = "test-support")]
306    pub fn clear_cache(&mut self) {
307        self.users.clear();
308        self.by_github_login.clear();
309    }
310
311    async fn handle_show_contacts(
312        this: Entity<Self>,
313        _: TypedEnvelope<proto::ShowContacts>,
314        mut cx: AsyncApp,
315    ) -> Result<()> {
316        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
317        Ok(())
318    }
319
320    async fn handle_update_contacts(
321        this: Entity<Self>,
322        message: TypedEnvelope<proto::UpdateContacts>,
323        cx: AsyncApp,
324    ) -> Result<()> {
325        this.read_with(&cx, |this, _| {
326            this.update_contacts_tx
327                .unbounded_send(UpdateContacts::Update(message.payload))
328                .unwrap();
329        });
330        Ok(())
331    }
332
333    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
334        match message {
335            UpdateContacts::Wait(barrier) => {
336                drop(barrier);
337                Task::ready(Ok(()))
338            }
339            UpdateContacts::Clear(barrier) => {
340                self.contacts.clear();
341                self.incoming_contact_requests.clear();
342                self.outgoing_contact_requests.clear();
343                drop(barrier);
344                Task::ready(Ok(()))
345            }
346            UpdateContacts::Update(message) => {
347                let mut user_ids = HashSet::default();
348                for contact in &message.contacts {
349                    user_ids.insert(contact.user_id);
350                }
351                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
352                user_ids.extend(message.outgoing_requests.iter());
353
354                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
355                cx.spawn(async move |this, cx| {
356                    load_users.await?;
357
358                    // Users are fetched in parallel above and cached in call to get_users
359                    // No need to parallelize here
360                    let mut updated_contacts = Vec::new();
361                    let this = this.upgrade().context("can't upgrade user store handle")?;
362                    for contact in message.contacts {
363                        updated_contacts
364                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
365                    }
366
367                    let mut incoming_requests = Vec::new();
368                    for request in message.incoming_requests {
369                        incoming_requests.push({
370                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
371                                .await?
372                        });
373                    }
374
375                    let mut outgoing_requests = Vec::new();
376                    for requested_user_id in message.outgoing_requests {
377                        outgoing_requests.push(
378                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
379                                .await?,
380                        );
381                    }
382
383                    let removed_contacts =
384                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
385                    let removed_incoming_requests =
386                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
387                    let removed_outgoing_requests =
388                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
389
390                    this.update(cx, |this, cx| {
391                        // Remove contacts
392                        this.contacts
393                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
394                        // Update existing contacts and insert new ones
395                        for updated_contact in updated_contacts {
396                            match this.contacts.binary_search_by_key(
397                                &&updated_contact.user.github_login,
398                                |contact| &contact.user.github_login,
399                            ) {
400                                Ok(ix) => this.contacts[ix] = updated_contact,
401                                Err(ix) => this.contacts.insert(ix, updated_contact),
402                            }
403                        }
404
405                        // Remove incoming contact requests
406                        this.incoming_contact_requests.retain(|user| {
407                            if removed_incoming_requests.contains(&user.id) {
408                                cx.emit(Event::Contact {
409                                    user: user.clone(),
410                                    kind: ContactEventKind::Cancelled,
411                                });
412                                false
413                            } else {
414                                true
415                            }
416                        });
417                        // Update existing incoming requests and insert new ones
418                        for user in incoming_requests {
419                            match this
420                                .incoming_contact_requests
421                                .binary_search_by_key(&&user.github_login, |contact| {
422                                    &contact.github_login
423                                }) {
424                                Ok(ix) => this.incoming_contact_requests[ix] = user,
425                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
426                            }
427                        }
428
429                        // Remove outgoing contact requests
430                        this.outgoing_contact_requests
431                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
432                        // Update existing incoming requests and insert new ones
433                        for request in outgoing_requests {
434                            match this
435                                .outgoing_contact_requests
436                                .binary_search_by_key(&&request.github_login, |contact| {
437                                    &contact.github_login
438                                }) {
439                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
440                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
441                            }
442                        }
443
444                        cx.notify();
445                    });
446
447                    Ok(())
448                })
449            }
450        }
451    }
452
453    pub fn contacts(&self) -> &[Arc<Contact>] {
454        &self.contacts
455    }
456
457    pub fn has_contact(&self, user: &Arc<User>) -> bool {
458        self.contacts
459            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
460            .is_ok()
461    }
462
463    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
464        &self.incoming_contact_requests
465    }
466
467    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
468        &self.outgoing_contact_requests
469    }
470
471    pub fn is_contact_request_pending(&self, user: &User) -> bool {
472        self.pending_contact_requests.contains_key(&user.id)
473    }
474
475    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
476        if self
477            .contacts
478            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
479            .is_ok()
480        {
481            ContactRequestStatus::RequestAccepted
482        } else if self
483            .outgoing_contact_requests
484            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
485            .is_ok()
486        {
487            ContactRequestStatus::RequestSent
488        } else if self
489            .incoming_contact_requests
490            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
491            .is_ok()
492        {
493            ContactRequestStatus::RequestReceived
494        } else {
495            ContactRequestStatus::None
496        }
497    }
498
499    pub fn request_contact(
500        &mut self,
501        responder_id: u64,
502        cx: &mut Context<Self>,
503    ) -> Task<Result<()>> {
504        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
505    }
506
507    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
508        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
509    }
510
511    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
512        self.incoming_contact_requests
513            .iter()
514            .any(|user| user.id == user_id)
515    }
516
517    pub fn respond_to_contact_request(
518        &mut self,
519        requester_id: u64,
520        accept: bool,
521        cx: &mut Context<Self>,
522    ) -> Task<Result<()>> {
523        self.perform_contact_request(
524            requester_id,
525            proto::RespondToContactRequest {
526                requester_id,
527                response: if accept {
528                    proto::ContactRequestResponse::Accept
529                } else {
530                    proto::ContactRequestResponse::Decline
531                } as i32,
532            },
533            cx,
534        )
535    }
536
537    pub fn dismiss_contact_request(
538        &self,
539        requester_id: u64,
540        cx: &Context<Self>,
541    ) -> Task<Result<()>> {
542        let client = self.client.upgrade();
543        cx.spawn(async move |_, _| {
544            client
545                .context("can't upgrade client reference")?
546                .request(proto::RespondToContactRequest {
547                    requester_id,
548                    response: proto::ContactRequestResponse::Dismiss as i32,
549                })
550                .await?;
551            Ok(())
552        })
553    }
554
555    fn perform_contact_request<T: RequestMessage>(
556        &mut self,
557        user_id: u64,
558        request: T,
559        cx: &mut Context<Self>,
560    ) -> Task<Result<()>> {
561        let client = self.client.upgrade();
562        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
563        cx.notify();
564
565        cx.spawn(async move |this, cx| {
566            let response = client
567                .context("can't upgrade client reference")?
568                .request(request)
569                .await;
570            this.update(cx, |this, cx| {
571                if let Entry::Occupied(mut request_count) =
572                    this.pending_contact_requests.entry(user_id)
573                {
574                    *request_count.get_mut() -= 1;
575                    if *request_count.get() == 0 {
576                        request_count.remove();
577                    }
578                }
579                cx.notify();
580            })?;
581            response?;
582            Ok(())
583        })
584    }
585
586    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
587        let (tx, mut rx) = postage::barrier::channel();
588        self.update_contacts_tx
589            .unbounded_send(UpdateContacts::Clear(tx))
590            .unwrap();
591        async move {
592            rx.next().await;
593        }
594    }
595
596    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
597        let (tx, mut rx) = postage::barrier::channel();
598        self.update_contacts_tx
599            .unbounded_send(UpdateContacts::Wait(tx))
600            .unwrap();
601        async move {
602            rx.next().await;
603        }
604    }
605
606    pub fn get_users(
607        &self,
608        user_ids: Vec<u64>,
609        cx: &Context<Self>,
610    ) -> Task<Result<Vec<Arc<User>>>> {
611        let mut user_ids_to_fetch = user_ids.clone();
612        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
613
614        cx.spawn(async move |this, cx| {
615            if !user_ids_to_fetch.is_empty() {
616                this.update(cx, |this, cx| {
617                    this.load_users(
618                        proto::GetUsers {
619                            user_ids: user_ids_to_fetch,
620                        },
621                        cx,
622                    )
623                })?
624                .await?;
625            }
626
627            this.read_with(cx, |this, _| {
628                user_ids
629                    .iter()
630                    .map(|user_id| {
631                        this.users
632                            .get(user_id)
633                            .cloned()
634                            .with_context(|| format!("user {user_id} not found"))
635                    })
636                    .collect()
637            })?
638        })
639    }
640
641    pub fn fuzzy_search_users(
642        &self,
643        query: String,
644        cx: &Context<Self>,
645    ) -> Task<Result<Vec<Arc<User>>>> {
646        self.load_users(proto::FuzzySearchUsers { query }, cx)
647    }
648
649    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
650        self.users.get(&user_id).cloned()
651    }
652
653    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
654        if let Some(user) = self.users.get(&user_id).cloned() {
655            return Some(user);
656        }
657
658        self.get_user(user_id, cx).detach_and_log_err(cx);
659        None
660    }
661
662    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
663        if let Some(user) = self.users.get(&user_id).cloned() {
664            return Task::ready(Ok(user));
665        }
666
667        let load_users = self.get_users(vec![user_id], cx);
668        cx.spawn(async move |this, cx| {
669            load_users.await?;
670            this.read_with(cx, |this, _| {
671                this.users
672                    .get(&user_id)
673                    .cloned()
674                    .context("server responded with no users")
675            })?
676        })
677    }
678
679    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
680        self.by_github_login
681            .get(github_login)
682            .and_then(|id| self.users.get(id).cloned())
683    }
684
685    pub fn current_user(&self) -> Option<Arc<User>> {
686        self.current_user.borrow().clone()
687    }
688
689    pub fn current_organization(&self) -> Option<Arc<Organization>> {
690        self.current_organization.clone()
691    }
692
693    pub fn plan(&self) -> Option<Plan> {
694        #[cfg(debug_assertions)]
695        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
696            use cloud_api_client::Plan;
697
698            return match plan.as_str() {
699                "free" => Some(Plan::ZedFree),
700                "trial" => Some(Plan::ZedProTrial),
701                "pro" => Some(Plan::ZedPro),
702                _ => {
703                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
704                }
705            };
706        }
707
708        self.plan_info.as_ref().map(|info| info.plan())
709    }
710
711    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
712        self.plan_info
713            .as_ref()
714            .and_then(|plan| plan.subscription_period)
715            .map(|subscription_period| {
716                (
717                    subscription_period.started_at.0,
718                    subscription_period.ended_at.0,
719                )
720            })
721    }
722
723    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
724        self.plan_info
725            .as_ref()
726            .and_then(|plan| plan.trial_started_at)
727            .map(|trial_started_at| trial_started_at.0)
728    }
729
730    /// Returns whether the user's account is too new to use the service.
731    pub fn account_too_young(&self) -> bool {
732        self.plan_info
733            .as_ref()
734            .map(|plan| plan.is_account_too_young)
735            .unwrap_or_default()
736    }
737
738    /// Returns whether the current user has overdue invoices and usage should be blocked.
739    pub fn has_overdue_invoices(&self) -> bool {
740        self.plan_info
741            .as_ref()
742            .map(|plan| plan.has_overdue_invoices)
743            .unwrap_or_default()
744    }
745
746    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
747        self.edit_prediction_usage
748    }
749
750    pub fn update_edit_prediction_usage(
751        &mut self,
752        usage: EditPredictionUsage,
753        cx: &mut Context<Self>,
754    ) {
755        self.edit_prediction_usage = Some(usage);
756        cx.notify();
757    }
758
759    pub fn clear_organizations(&mut self) {
760        self.organizations.clear();
761        self.current_organization = None;
762    }
763
764    pub fn clear_plan_and_usage(&mut self) {
765        self.plan_info = None;
766        self.edit_prediction_usage = None;
767    }
768
769    fn update_authenticated_user(
770        &mut self,
771        response: GetAuthenticatedUserResponse,
772        cx: &mut Context<Self>,
773    ) {
774        let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
775        cx.update_flags(staff, response.feature_flags);
776        if let Some(client) = self.client.upgrade() {
777            client
778                .telemetry
779                .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
780        }
781
782        self.organizations = response.organizations.into_iter().map(Arc::new).collect();
783        self.current_organization = self.organizations.first().cloned();
784
785        self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
786            limit: response.plan.usage.edit_predictions.limit,
787            amount: response.plan.usage.edit_predictions.used as i32,
788        }));
789        self.plan_info = Some(response.plan);
790        cx.emit(Event::PrivateUserInfoUpdated);
791    }
792
793    fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
794        cx.spawn(async move |cx| {
795            match message {
796                MessageToClient::UserUpdated => {
797                    let cloud_client = cx
798                        .update(|cx| {
799                            this.read_with(cx, |this, _cx| {
800                                this.client.upgrade().map(|client| client.cloud_client())
801                            })
802                        })?
803                        .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
804
805                    let response = cloud_client.get_authenticated_user().await?;
806                    cx.update(|cx| {
807                        this.update(cx, |this, cx| {
808                            this.update_authenticated_user(response, cx);
809                        })
810                    })?;
811                }
812            }
813
814            anyhow::Ok(())
815        })
816        .detach_and_log_err(cx);
817    }
818
819    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
820        self.current_user.clone()
821    }
822
823    fn load_users(
824        &self,
825        request: impl RequestMessage<Response = UsersResponse>,
826        cx: &Context<Self>,
827    ) -> Task<Result<Vec<Arc<User>>>> {
828        let client = self.client.clone();
829        cx.spawn(async move |this, cx| {
830            if let Some(rpc) = client.upgrade() {
831                let response = rpc.request(request).await.context("error loading users")?;
832                let users = response.users;
833
834                this.update(cx, |this, _| this.insert(users))
835            } else {
836                Ok(Vec::new())
837            }
838        })
839    }
840
841    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
842        let mut ret = Vec::with_capacity(users.len());
843        for user in users {
844            let user = User::new(user);
845            if let Some(old) = self.users.insert(user.id, user.clone())
846                && old.github_login != user.github_login
847            {
848                self.by_github_login.remove(&old.github_login);
849            }
850            self.by_github_login
851                .insert(user.github_login.clone(), user.id);
852            ret.push(user)
853        }
854        ret
855    }
856
857    pub fn set_participant_indices(
858        &mut self,
859        participant_indices: HashMap<u64, ParticipantIndex>,
860        cx: &mut Context<Self>,
861    ) {
862        if participant_indices != self.participant_indices {
863            self.participant_indices = participant_indices;
864            cx.emit(Event::ParticipantIndicesChanged);
865        }
866    }
867
868    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
869        &self.participant_indices
870    }
871
872    pub fn participant_names(
873        &self,
874        user_ids: impl Iterator<Item = u64>,
875        cx: &App,
876    ) -> HashMap<u64, SharedString> {
877        let mut ret = HashMap::default();
878        let mut missing_user_ids = Vec::new();
879        for id in user_ids {
880            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
881                ret.insert(id, github_login);
882            } else {
883                missing_user_ids.push(id)
884            }
885        }
886        if !missing_user_ids.is_empty() {
887            let this = self.weak_self.clone();
888            cx.spawn(async move |cx| {
889                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
890                    .await
891            })
892            .detach_and_log_err(cx);
893        }
894        ret
895    }
896}
897
898impl User {
899    fn new(message: proto::User) -> Arc<Self> {
900        Arc::new(User {
901            id: message.id,
902            github_login: message.github_login.into(),
903            avatar_uri: message.avatar_url.into(),
904            name: message.name,
905        })
906    }
907}
908
909impl Contact {
910    async fn from_proto(
911        contact: proto::Contact,
912        user_store: &Entity<UserStore>,
913        cx: &mut AsyncApp,
914    ) -> Result<Self> {
915        let user = user_store
916            .update(cx, |user_store, cx| {
917                user_store.get_user(contact.user_id, cx)
918            })
919            .await?;
920        Ok(Self {
921            user,
922            online: contact.online,
923            busy: contact.busy,
924        })
925    }
926}
927
928impl Collaborator {
929    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
930        Ok(Self {
931            peer_id: message.peer_id.context("invalid peer id")?,
932            replica_id: ReplicaId::new(message.replica_id as u16),
933            user_id: message.user_id as UserId,
934            is_host: message.is_host,
935            committer_name: message.committer_name,
936            committer_email: message.committer_email,
937        })
938    }
939}
940
941impl RequestUsage {
942    pub fn over_limit(&self) -> bool {
943        match self.limit {
944            UsageLimit::Limited(limit) => self.amount >= limit,
945            UsageLimit::Unlimited => false,
946        }
947    }
948
949    fn from_headers(
950        limit_name: &str,
951        amount_name: &str,
952        headers: &HeaderMap<HeaderValue>,
953    ) -> Result<Self> {
954        let limit = headers
955            .get(limit_name)
956            .with_context(|| format!("missing {limit_name:?} header"))?;
957        let limit = UsageLimit::from_str(limit.to_str()?)?;
958
959        let amount = headers
960            .get(amount_name)
961            .with_context(|| format!("missing {amount_name:?} header"))?;
962        let amount = amount.to_str()?.parse::<i32>()?;
963
964        Ok(Self { limit, amount })
965    }
966}
967
968impl EditPredictionUsage {
969    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
970        Ok(Self(RequestUsage::from_headers(
971            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
972            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
973            headers,
974        )?))
975    }
976}