user.rs

  1use super::{Client, Status, TypedEnvelope, proto};
  2use anyhow::{Context as _, Result};
  3use chrono::{DateTime, Utc};
  4use cloud_api_client::websocket_protocol::MessageToClient;
  5use cloud_api_client::{GetAuthenticatedUserResponse, Organization, Plan, PlanInfo};
  6use cloud_llm_client::{
  7    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
  8};
  9use collections::{HashMap, HashSet, hash_map::Entry};
 10use derive_more::Deref;
 11use feature_flags::FeatureFlagAppExt;
 12use futures::{Future, StreamExt, channel::mpsc};
 13use gpui::{
 14    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
 15};
 16use http_client::http::{HeaderMap, HeaderValue};
 17use postage::{sink::Sink, watch};
 18use rpc::proto::{RequestMessage, UsersResponse};
 19use std::{
 20    str::FromStr as _,
 21    sync::{Arc, Weak},
 22};
 23use text::ReplicaId;
 24use util::{ResultExt, TryFutureExt as _};
 25
 26pub type UserId = u64;
 27
 28#[derive(
 29    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 30)]
 31pub struct ChannelId(pub u64);
 32
 33impl std::fmt::Display for ChannelId {
 34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 35        self.0.fmt(f)
 36    }
 37}
 38
 39#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 40pub struct ProjectId(pub u64);
 41
 42impl ProjectId {
 43    pub fn to_proto(self) -> u64 {
 44        self.0
 45    }
 46}
 47
 48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 49pub struct ParticipantIndex(pub u32);
 50
 51#[derive(Default, Debug)]
 52pub struct User {
 53    pub id: UserId,
 54    pub github_login: SharedString,
 55    pub avatar_uri: SharedUri,
 56    pub name: Option<String>,
 57}
 58
 59#[derive(Clone, Debug, PartialEq, Eq)]
 60pub struct Collaborator {
 61    pub peer_id: proto::PeerId,
 62    pub replica_id: ReplicaId,
 63    pub user_id: UserId,
 64    pub is_host: bool,
 65    pub committer_name: Option<String>,
 66    pub committer_email: Option<String>,
 67}
 68
 69impl PartialOrd for User {
 70    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 71        Some(self.cmp(other))
 72    }
 73}
 74
 75impl Ord for User {
 76    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 77        self.github_login.cmp(&other.github_login)
 78    }
 79}
 80
 81impl PartialEq for User {
 82    fn eq(&self, other: &Self) -> bool {
 83        self.id == other.id && self.github_login == other.github_login
 84    }
 85}
 86
 87impl Eq for User {}
 88
 89#[derive(Debug, PartialEq)]
 90pub struct Contact {
 91    pub user: Arc<User>,
 92    pub online: bool,
 93    pub busy: bool,
 94}
 95
 96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 97pub enum ContactRequestStatus {
 98    None,
 99    RequestSent,
100    RequestReceived,
101    RequestAccepted,
102}
103
104pub struct UserStore {
105    users: HashMap<u64, Arc<User>>,
106    by_github_login: HashMap<SharedString, u64>,
107    participant_indices: HashMap<u64, ParticipantIndex>,
108    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
109    edit_prediction_usage: Option<EditPredictionUsage>,
110    plan_info: Option<PlanInfo>,
111    current_user: watch::Receiver<Option<Arc<User>>>,
112    current_organization: Option<Arc<Organization>>,
113    organizations: Vec<Arc<Organization>>,
114    contacts: Vec<Arc<Contact>>,
115    incoming_contact_requests: Vec<Arc<User>>,
116    outgoing_contact_requests: Vec<Arc<User>>,
117    pending_contact_requests: HashMap<u64, usize>,
118    client: Weak<Client>,
119    _maintain_contacts: Task<()>,
120    _maintain_current_user: Task<Result<()>>,
121    weak_self: WeakEntity<Self>,
122}
123
124#[derive(Clone)]
125pub struct InviteInfo {
126    pub count: u32,
127    pub url: Arc<str>,
128}
129
130pub enum Event {
131    Contact {
132        user: Arc<User>,
133        kind: ContactEventKind,
134    },
135    ShowContacts,
136    ParticipantIndicesChanged,
137    PrivateUserInfoUpdated,
138    PlanUpdated,
139}
140
141#[derive(Clone, Copy)]
142pub enum ContactEventKind {
143    Requested,
144    Accepted,
145    Cancelled,
146}
147
148impl EventEmitter<Event> for UserStore {}
149
150enum UpdateContacts {
151    Update(proto::UpdateContacts),
152    Wait(postage::barrier::Sender),
153    Clear(postage::barrier::Sender),
154}
155
156#[derive(Debug, Clone, Copy, Deref)]
157pub struct EditPredictionUsage(pub RequestUsage);
158
159#[derive(Debug, Clone, Copy)]
160pub struct RequestUsage {
161    pub limit: UsageLimit,
162    pub amount: i32,
163}
164
165impl UserStore {
166    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
167        let (mut current_user_tx, current_user_rx) = watch::channel();
168        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
169        let rpc_subscriptions = vec![
170            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
171            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
172        ];
173
174        client.add_message_to_client_handler({
175            let this = cx.weak_entity();
176            move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
177        });
178
179        Self {
180            users: Default::default(),
181            by_github_login: Default::default(),
182            current_user: current_user_rx,
183            current_organization: None,
184            organizations: Vec::new(),
185            plan_info: None,
186            edit_prediction_usage: None,
187            contacts: Default::default(),
188            incoming_contact_requests: Default::default(),
189            participant_indices: Default::default(),
190            outgoing_contact_requests: Default::default(),
191            client: Arc::downgrade(&client),
192            update_contacts_tx,
193            _maintain_contacts: cx.spawn(async move |this, cx| {
194                let _subscriptions = rpc_subscriptions;
195                while let Some(message) = update_contacts_rx.next().await {
196                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
197                    {
198                        task.log_err().await;
199                    } else {
200                        break;
201                    }
202                }
203            }),
204            _maintain_current_user: cx.spawn(async move |this, cx| {
205                let mut status = client.status();
206                let weak = Arc::downgrade(&client);
207                drop(client);
208                while let Some(status) = status.next().await {
209                    // if the client is dropped, the app is shutting down.
210                    let Some(client) = weak.upgrade() else {
211                        return Ok(());
212                    };
213                    match status {
214                        Status::Authenticated
215                        | Status::Reauthenticated
216                        | Status::Connected { .. } => {
217                            if let Some(user_id) = client.user_id() {
218                                let response = client
219                                    .cloud_client()
220                                    .get_authenticated_user()
221                                    .await
222                                    .log_err();
223
224                                let current_user_and_response = if let Some(response) = response {
225                                    let user = Arc::new(User {
226                                        id: user_id,
227                                        github_login: response.user.github_login.clone().into(),
228                                        avatar_uri: response.user.avatar_url.clone().into(),
229                                        name: response.user.name.clone(),
230                                    });
231
232                                    Some((user, response))
233                                } else {
234                                    None
235                                };
236                                current_user_tx
237                                    .send(
238                                        current_user_and_response
239                                            .as_ref()
240                                            .map(|(user, _)| user.clone()),
241                                    )
242                                    .await
243                                    .ok();
244
245                                cx.update(|cx| {
246                                    if let Some((user, response)) = current_user_and_response {
247                                        this.update(cx, |this, cx| {
248                                            this.by_github_login
249                                                .insert(user.github_login.clone(), user_id);
250                                            this.users.insert(user_id, user);
251                                            this.update_authenticated_user(response, cx)
252                                        })
253                                    } else {
254                                        anyhow::Ok(())
255                                    }
256                                })?;
257
258                                this.update(cx, |_, cx| cx.notify())?;
259                            }
260                        }
261                        Status::SignedOut => {
262                            current_user_tx.send(None).await.ok();
263                            this.update(cx, |this, cx| {
264                                this.clear_organizations();
265                                this.clear_plan_and_usage();
266                                cx.emit(Event::PrivateUserInfoUpdated);
267                                cx.notify();
268                                this.clear_contacts()
269                            })?
270                            .await;
271                        }
272                        Status::ConnectionLost => {
273                            this.update(cx, |this, cx| {
274                                cx.notify();
275                                this.clear_contacts()
276                            })?
277                            .await;
278                        }
279                        _ => {}
280                    }
281                }
282                Ok(())
283            }),
284            pending_contact_requests: Default::default(),
285            weak_self: cx.weak_entity(),
286        }
287    }
288
289    #[cfg(feature = "test-support")]
290    pub fn clear_cache(&mut self) {
291        self.users.clear();
292        self.by_github_login.clear();
293    }
294
295    async fn handle_show_contacts(
296        this: Entity<Self>,
297        _: TypedEnvelope<proto::ShowContacts>,
298        mut cx: AsyncApp,
299    ) -> Result<()> {
300        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
301        Ok(())
302    }
303
304    async fn handle_update_contacts(
305        this: Entity<Self>,
306        message: TypedEnvelope<proto::UpdateContacts>,
307        cx: AsyncApp,
308    ) -> Result<()> {
309        this.read_with(&cx, |this, _| {
310            this.update_contacts_tx
311                .unbounded_send(UpdateContacts::Update(message.payload))
312                .unwrap();
313        });
314        Ok(())
315    }
316
317    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
318        match message {
319            UpdateContacts::Wait(barrier) => {
320                drop(barrier);
321                Task::ready(Ok(()))
322            }
323            UpdateContacts::Clear(barrier) => {
324                self.contacts.clear();
325                self.incoming_contact_requests.clear();
326                self.outgoing_contact_requests.clear();
327                drop(barrier);
328                Task::ready(Ok(()))
329            }
330            UpdateContacts::Update(message) => {
331                let mut user_ids = HashSet::default();
332                for contact in &message.contacts {
333                    user_ids.insert(contact.user_id);
334                }
335                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
336                user_ids.extend(message.outgoing_requests.iter());
337
338                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
339                cx.spawn(async move |this, cx| {
340                    load_users.await?;
341
342                    // Users are fetched in parallel above and cached in call to get_users
343                    // No need to parallelize here
344                    let mut updated_contacts = Vec::new();
345                    let this = this.upgrade().context("can't upgrade user store handle")?;
346                    for contact in message.contacts {
347                        updated_contacts
348                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
349                    }
350
351                    let mut incoming_requests = Vec::new();
352                    for request in message.incoming_requests {
353                        incoming_requests.push({
354                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
355                                .await?
356                        });
357                    }
358
359                    let mut outgoing_requests = Vec::new();
360                    for requested_user_id in message.outgoing_requests {
361                        outgoing_requests.push(
362                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
363                                .await?,
364                        );
365                    }
366
367                    let removed_contacts =
368                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
369                    let removed_incoming_requests =
370                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
371                    let removed_outgoing_requests =
372                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
373
374                    this.update(cx, |this, cx| {
375                        // Remove contacts
376                        this.contacts
377                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
378                        // Update existing contacts and insert new ones
379                        for updated_contact in updated_contacts {
380                            match this.contacts.binary_search_by_key(
381                                &&updated_contact.user.github_login,
382                                |contact| &contact.user.github_login,
383                            ) {
384                                Ok(ix) => this.contacts[ix] = updated_contact,
385                                Err(ix) => this.contacts.insert(ix, updated_contact),
386                            }
387                        }
388
389                        // Remove incoming contact requests
390                        this.incoming_contact_requests.retain(|user| {
391                            if removed_incoming_requests.contains(&user.id) {
392                                cx.emit(Event::Contact {
393                                    user: user.clone(),
394                                    kind: ContactEventKind::Cancelled,
395                                });
396                                false
397                            } else {
398                                true
399                            }
400                        });
401                        // Update existing incoming requests and insert new ones
402                        for user in incoming_requests {
403                            match this
404                                .incoming_contact_requests
405                                .binary_search_by_key(&&user.github_login, |contact| {
406                                    &contact.github_login
407                                }) {
408                                Ok(ix) => this.incoming_contact_requests[ix] = user,
409                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
410                            }
411                        }
412
413                        // Remove outgoing contact requests
414                        this.outgoing_contact_requests
415                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
416                        // Update existing incoming requests and insert new ones
417                        for request in outgoing_requests {
418                            match this
419                                .outgoing_contact_requests
420                                .binary_search_by_key(&&request.github_login, |contact| {
421                                    &contact.github_login
422                                }) {
423                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
424                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
425                            }
426                        }
427
428                        cx.notify();
429                    });
430
431                    Ok(())
432                })
433            }
434        }
435    }
436
437    pub fn contacts(&self) -> &[Arc<Contact>] {
438        &self.contacts
439    }
440
441    pub fn has_contact(&self, user: &Arc<User>) -> bool {
442        self.contacts
443            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
444            .is_ok()
445    }
446
447    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
448        &self.incoming_contact_requests
449    }
450
451    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
452        &self.outgoing_contact_requests
453    }
454
455    pub fn is_contact_request_pending(&self, user: &User) -> bool {
456        self.pending_contact_requests.contains_key(&user.id)
457    }
458
459    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
460        if self
461            .contacts
462            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
463            .is_ok()
464        {
465            ContactRequestStatus::RequestAccepted
466        } else if self
467            .outgoing_contact_requests
468            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
469            .is_ok()
470        {
471            ContactRequestStatus::RequestSent
472        } else if self
473            .incoming_contact_requests
474            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
475            .is_ok()
476        {
477            ContactRequestStatus::RequestReceived
478        } else {
479            ContactRequestStatus::None
480        }
481    }
482
483    pub fn request_contact(
484        &mut self,
485        responder_id: u64,
486        cx: &mut Context<Self>,
487    ) -> Task<Result<()>> {
488        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
489    }
490
491    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
492        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
493    }
494
495    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
496        self.incoming_contact_requests
497            .iter()
498            .any(|user| user.id == user_id)
499    }
500
501    pub fn respond_to_contact_request(
502        &mut self,
503        requester_id: u64,
504        accept: bool,
505        cx: &mut Context<Self>,
506    ) -> Task<Result<()>> {
507        self.perform_contact_request(
508            requester_id,
509            proto::RespondToContactRequest {
510                requester_id,
511                response: if accept {
512                    proto::ContactRequestResponse::Accept
513                } else {
514                    proto::ContactRequestResponse::Decline
515                } as i32,
516            },
517            cx,
518        )
519    }
520
521    pub fn dismiss_contact_request(
522        &self,
523        requester_id: u64,
524        cx: &Context<Self>,
525    ) -> Task<Result<()>> {
526        let client = self.client.upgrade();
527        cx.spawn(async move |_, _| {
528            client
529                .context("can't upgrade client reference")?
530                .request(proto::RespondToContactRequest {
531                    requester_id,
532                    response: proto::ContactRequestResponse::Dismiss as i32,
533                })
534                .await?;
535            Ok(())
536        })
537    }
538
539    fn perform_contact_request<T: RequestMessage>(
540        &mut self,
541        user_id: u64,
542        request: T,
543        cx: &mut Context<Self>,
544    ) -> Task<Result<()>> {
545        let client = self.client.upgrade();
546        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
547        cx.notify();
548
549        cx.spawn(async move |this, cx| {
550            let response = client
551                .context("can't upgrade client reference")?
552                .request(request)
553                .await;
554            this.update(cx, |this, cx| {
555                if let Entry::Occupied(mut request_count) =
556                    this.pending_contact_requests.entry(user_id)
557                {
558                    *request_count.get_mut() -= 1;
559                    if *request_count.get() == 0 {
560                        request_count.remove();
561                    }
562                }
563                cx.notify();
564            })?;
565            response?;
566            Ok(())
567        })
568    }
569
570    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
571        let (tx, mut rx) = postage::barrier::channel();
572        self.update_contacts_tx
573            .unbounded_send(UpdateContacts::Clear(tx))
574            .unwrap();
575        async move {
576            rx.next().await;
577        }
578    }
579
580    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
581        let (tx, mut rx) = postage::barrier::channel();
582        self.update_contacts_tx
583            .unbounded_send(UpdateContacts::Wait(tx))
584            .unwrap();
585        async move {
586            rx.next().await;
587        }
588    }
589
590    pub fn get_users(
591        &self,
592        user_ids: Vec<u64>,
593        cx: &Context<Self>,
594    ) -> Task<Result<Vec<Arc<User>>>> {
595        let mut user_ids_to_fetch = user_ids.clone();
596        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
597
598        cx.spawn(async move |this, cx| {
599            if !user_ids_to_fetch.is_empty() {
600                this.update(cx, |this, cx| {
601                    this.load_users(
602                        proto::GetUsers {
603                            user_ids: user_ids_to_fetch,
604                        },
605                        cx,
606                    )
607                })?
608                .await?;
609            }
610
611            this.read_with(cx, |this, _| {
612                user_ids
613                    .iter()
614                    .map(|user_id| {
615                        this.users
616                            .get(user_id)
617                            .cloned()
618                            .with_context(|| format!("user {user_id} not found"))
619                    })
620                    .collect()
621            })?
622        })
623    }
624
625    pub fn fuzzy_search_users(
626        &self,
627        query: String,
628        cx: &Context<Self>,
629    ) -> Task<Result<Vec<Arc<User>>>> {
630        self.load_users(proto::FuzzySearchUsers { query }, cx)
631    }
632
633    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
634        self.users.get(&user_id).cloned()
635    }
636
637    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
638        if let Some(user) = self.users.get(&user_id).cloned() {
639            return Some(user);
640        }
641
642        self.get_user(user_id, cx).detach_and_log_err(cx);
643        None
644    }
645
646    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
647        if let Some(user) = self.users.get(&user_id).cloned() {
648            return Task::ready(Ok(user));
649        }
650
651        let load_users = self.get_users(vec![user_id], cx);
652        cx.spawn(async move |this, cx| {
653            load_users.await?;
654            this.read_with(cx, |this, _| {
655                this.users
656                    .get(&user_id)
657                    .cloned()
658                    .context("server responded with no users")
659            })?
660        })
661    }
662
663    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
664        self.by_github_login
665            .get(github_login)
666            .and_then(|id| self.users.get(id).cloned())
667    }
668
669    pub fn current_user(&self) -> Option<Arc<User>> {
670        self.current_user.borrow().clone()
671    }
672
673    pub fn current_organization(&self) -> Option<Arc<Organization>> {
674        self.current_organization.clone()
675    }
676
677    pub fn plan(&self) -> Option<Plan> {
678        #[cfg(debug_assertions)]
679        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
680            use cloud_api_client::Plan;
681
682            return match plan.as_str() {
683                "free" => Some(Plan::ZedFree),
684                "trial" => Some(Plan::ZedProTrial),
685                "pro" => Some(Plan::ZedPro),
686                _ => {
687                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
688                }
689            };
690        }
691
692        self.plan_info.as_ref().map(|info| info.plan())
693    }
694
695    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
696        self.plan_info
697            .as_ref()
698            .and_then(|plan| plan.subscription_period)
699            .map(|subscription_period| {
700                (
701                    subscription_period.started_at.0,
702                    subscription_period.ended_at.0,
703                )
704            })
705    }
706
707    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
708        self.plan_info
709            .as_ref()
710            .and_then(|plan| plan.trial_started_at)
711            .map(|trial_started_at| trial_started_at.0)
712    }
713
714    /// Returns whether the user's account is too new to use the service.
715    pub fn account_too_young(&self) -> bool {
716        self.plan_info
717            .as_ref()
718            .map(|plan| plan.is_account_too_young)
719            .unwrap_or_default()
720    }
721
722    /// Returns whether the current user has overdue invoices and usage should be blocked.
723    pub fn has_overdue_invoices(&self) -> bool {
724        self.plan_info
725            .as_ref()
726            .map(|plan| plan.has_overdue_invoices)
727            .unwrap_or_default()
728    }
729
730    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
731        self.edit_prediction_usage
732    }
733
734    pub fn update_edit_prediction_usage(
735        &mut self,
736        usage: EditPredictionUsage,
737        cx: &mut Context<Self>,
738    ) {
739        self.edit_prediction_usage = Some(usage);
740        cx.notify();
741    }
742
743    pub fn clear_organizations(&mut self) {
744        self.organizations.clear();
745        self.current_organization = None;
746    }
747
748    pub fn clear_plan_and_usage(&mut self) {
749        self.plan_info = None;
750        self.edit_prediction_usage = None;
751    }
752
753    fn update_authenticated_user(
754        &mut self,
755        response: GetAuthenticatedUserResponse,
756        cx: &mut Context<Self>,
757    ) {
758        let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
759        cx.update_flags(staff, response.feature_flags);
760        if let Some(client) = self.client.upgrade() {
761            client
762                .telemetry
763                .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
764        }
765
766        self.organizations = response.organizations.into_iter().map(Arc::new).collect();
767        self.current_organization = self.organizations.first().cloned();
768
769        self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
770            limit: response.plan.usage.edit_predictions.limit,
771            amount: response.plan.usage.edit_predictions.used as i32,
772        }));
773        self.plan_info = Some(response.plan);
774        cx.emit(Event::PrivateUserInfoUpdated);
775    }
776
777    fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
778        cx.spawn(async move |cx| {
779            match message {
780                MessageToClient::UserUpdated => {
781                    let cloud_client = cx
782                        .update(|cx| {
783                            this.read_with(cx, |this, _cx| {
784                                this.client.upgrade().map(|client| client.cloud_client())
785                            })
786                        })?
787                        .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
788
789                    let response = cloud_client.get_authenticated_user().await?;
790                    cx.update(|cx| {
791                        this.update(cx, |this, cx| {
792                            this.update_authenticated_user(response, cx);
793                        })
794                    })?;
795                }
796            }
797
798            anyhow::Ok(())
799        })
800        .detach_and_log_err(cx);
801    }
802
803    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
804        self.current_user.clone()
805    }
806
807    fn load_users(
808        &self,
809        request: impl RequestMessage<Response = UsersResponse>,
810        cx: &Context<Self>,
811    ) -> Task<Result<Vec<Arc<User>>>> {
812        let client = self.client.clone();
813        cx.spawn(async move |this, cx| {
814            if let Some(rpc) = client.upgrade() {
815                let response = rpc.request(request).await.context("error loading users")?;
816                let users = response.users;
817
818                this.update(cx, |this, _| this.insert(users))
819            } else {
820                Ok(Vec::new())
821            }
822        })
823    }
824
825    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
826        let mut ret = Vec::with_capacity(users.len());
827        for user in users {
828            let user = User::new(user);
829            if let Some(old) = self.users.insert(user.id, user.clone())
830                && old.github_login != user.github_login
831            {
832                self.by_github_login.remove(&old.github_login);
833            }
834            self.by_github_login
835                .insert(user.github_login.clone(), user.id);
836            ret.push(user)
837        }
838        ret
839    }
840
841    pub fn set_participant_indices(
842        &mut self,
843        participant_indices: HashMap<u64, ParticipantIndex>,
844        cx: &mut Context<Self>,
845    ) {
846        if participant_indices != self.participant_indices {
847            self.participant_indices = participant_indices;
848            cx.emit(Event::ParticipantIndicesChanged);
849        }
850    }
851
852    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
853        &self.participant_indices
854    }
855
856    pub fn participant_names(
857        &self,
858        user_ids: impl Iterator<Item = u64>,
859        cx: &App,
860    ) -> HashMap<u64, SharedString> {
861        let mut ret = HashMap::default();
862        let mut missing_user_ids = Vec::new();
863        for id in user_ids {
864            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
865                ret.insert(id, github_login);
866            } else {
867                missing_user_ids.push(id)
868            }
869        }
870        if !missing_user_ids.is_empty() {
871            let this = self.weak_self.clone();
872            cx.spawn(async move |cx| {
873                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
874                    .await
875            })
876            .detach_and_log_err(cx);
877        }
878        ret
879    }
880}
881
882impl User {
883    fn new(message: proto::User) -> Arc<Self> {
884        Arc::new(User {
885            id: message.id,
886            github_login: message.github_login.into(),
887            avatar_uri: message.avatar_url.into(),
888            name: message.name,
889        })
890    }
891}
892
893impl Contact {
894    async fn from_proto(
895        contact: proto::Contact,
896        user_store: &Entity<UserStore>,
897        cx: &mut AsyncApp,
898    ) -> Result<Self> {
899        let user = user_store
900            .update(cx, |user_store, cx| {
901                user_store.get_user(contact.user_id, cx)
902            })
903            .await?;
904        Ok(Self {
905            user,
906            online: contact.online,
907            busy: contact.busy,
908        })
909    }
910}
911
912impl Collaborator {
913    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
914        Ok(Self {
915            peer_id: message.peer_id.context("invalid peer id")?,
916            replica_id: ReplicaId::new(message.replica_id as u16),
917            user_id: message.user_id as UserId,
918            is_host: message.is_host,
919            committer_name: message.committer_name,
920            committer_email: message.committer_email,
921        })
922    }
923}
924
925impl RequestUsage {
926    pub fn over_limit(&self) -> bool {
927        match self.limit {
928            UsageLimit::Limited(limit) => self.amount >= limit,
929            UsageLimit::Unlimited => false,
930        }
931    }
932
933    fn from_headers(
934        limit_name: &str,
935        amount_name: &str,
936        headers: &HeaderMap<HeaderValue>,
937    ) -> Result<Self> {
938        let limit = headers
939            .get(limit_name)
940            .with_context(|| format!("missing {limit_name:?} header"))?;
941        let limit = UsageLimit::from_str(limit.to_str()?)?;
942
943        let amount = headers
944            .get(amount_name)
945            .with_context(|| format!("missing {amount_name:?} header"))?;
946        let amount = amount.to_str()?.parse::<i32>()?;
947
948        Ok(Self { limit, amount })
949    }
950}
951
952impl EditPredictionUsage {
953    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
954        Ok(Self(RequestUsage::from_headers(
955            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
956            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
957            headers,
958        )?))
959    }
960}