1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result};
3use chrono::{DateTime, Utc};
4use cloud_api_client::websocket_protocol::MessageToClient;
5use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
6use cloud_llm_client::{
7 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
8 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
9};
10use collections::{HashMap, HashSet, hash_map::Entry};
11use derive_more::Deref;
12use feature_flags::FeatureFlagAppExt;
13use futures::{Future, StreamExt, channel::mpsc};
14use gpui::{
15 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
16};
17use http_client::http::{HeaderMap, HeaderValue};
18use postage::{sink::Sink, watch};
19use rpc::proto::{RequestMessage, UsersResponse};
20use std::{
21 str::FromStr as _,
22 sync::{Arc, Weak},
23};
24use text::ReplicaId;
25use util::{ResultExt, TryFutureExt as _};
26
27pub type UserId = u64;
28
29#[derive(
30 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
31)]
32pub struct ChannelId(pub u64);
33
34impl std::fmt::Display for ChannelId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 self.0.fmt(f)
37 }
38}
39
40#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
41pub struct ProjectId(pub u64);
42
43impl ProjectId {
44 pub fn to_proto(self) -> u64 {
45 self.0
46 }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub struct ParticipantIndex(pub u32);
51
52#[derive(Default, Debug)]
53pub struct User {
54 pub id: UserId,
55 pub github_login: SharedString,
56 pub avatar_uri: SharedUri,
57 pub name: Option<String>,
58}
59
60#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct Collaborator {
62 pub peer_id: proto::PeerId,
63 pub replica_id: ReplicaId,
64 pub user_id: UserId,
65 pub is_host: bool,
66 pub committer_name: Option<String>,
67 pub committer_email: Option<String>,
68}
69
70impl PartialOrd for User {
71 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
72 Some(self.cmp(other))
73 }
74}
75
76impl Ord for User {
77 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
78 self.github_login.cmp(&other.github_login)
79 }
80}
81
82impl PartialEq for User {
83 fn eq(&self, other: &Self) -> bool {
84 self.id == other.id && self.github_login == other.github_login
85 }
86}
87
88impl Eq for User {}
89
90#[derive(Debug, PartialEq)]
91pub struct Contact {
92 pub user: Arc<User>,
93 pub online: bool,
94 pub busy: bool,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ContactRequestStatus {
99 None,
100 RequestSent,
101 RequestReceived,
102 RequestAccepted,
103}
104
105pub struct UserStore {
106 users: HashMap<u64, Arc<User>>,
107 by_github_login: HashMap<SharedString, u64>,
108 participant_indices: HashMap<u64, ParticipantIndex>,
109 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
110 model_request_usage: Option<ModelRequestUsage>,
111 edit_prediction_usage: Option<EditPredictionUsage>,
112 plan_info: Option<PlanInfo>,
113 current_user: watch::Receiver<Option<Arc<User>>>,
114 contacts: Vec<Arc<Contact>>,
115 incoming_contact_requests: Vec<Arc<User>>,
116 outgoing_contact_requests: Vec<Arc<User>>,
117 pending_contact_requests: HashMap<u64, usize>,
118 invite_info: Option<InviteInfo>,
119 client: Weak<Client>,
120 _maintain_contacts: Task<()>,
121 _maintain_current_user: Task<Result<()>>,
122 weak_self: WeakEntity<Self>,
123}
124
125#[derive(Clone)]
126pub struct InviteInfo {
127 pub count: u32,
128 pub url: Arc<str>,
129}
130
131pub enum Event {
132 Contact {
133 user: Arc<User>,
134 kind: ContactEventKind,
135 },
136 ShowContacts,
137 ParticipantIndicesChanged,
138 PrivateUserInfoUpdated,
139 PlanUpdated,
140}
141
142#[derive(Clone, Copy)]
143pub enum ContactEventKind {
144 Requested,
145 Accepted,
146 Cancelled,
147}
148
149impl EventEmitter<Event> for UserStore {}
150
151enum UpdateContacts {
152 Update(proto::UpdateContacts),
153 Wait(postage::barrier::Sender),
154 Clear(postage::barrier::Sender),
155}
156
157#[derive(Debug, Clone, Copy, Deref)]
158pub struct ModelRequestUsage(pub RequestUsage);
159
160#[derive(Debug, Clone, Copy, Deref)]
161pub struct EditPredictionUsage(pub RequestUsage);
162
163#[derive(Debug, Clone, Copy)]
164pub struct RequestUsage {
165 pub limit: UsageLimit,
166 pub amount: i32,
167}
168
169impl UserStore {
170 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
171 let (mut current_user_tx, current_user_rx) = watch::channel();
172 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
173 let rpc_subscriptions = vec![
174 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
175 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
176 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
177 ];
178
179 client.add_message_to_client_handler({
180 let this = cx.weak_entity();
181 move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
182 });
183
184 Self {
185 users: Default::default(),
186 by_github_login: Default::default(),
187 current_user: current_user_rx,
188 plan_info: None,
189 model_request_usage: None,
190 edit_prediction_usage: None,
191 contacts: Default::default(),
192 incoming_contact_requests: Default::default(),
193 participant_indices: Default::default(),
194 outgoing_contact_requests: Default::default(),
195 invite_info: None,
196 client: Arc::downgrade(&client),
197 update_contacts_tx,
198 _maintain_contacts: cx.spawn(async move |this, cx| {
199 let _subscriptions = rpc_subscriptions;
200 while let Some(message) = update_contacts_rx.next().await {
201 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
202 {
203 task.log_err().await;
204 } else {
205 break;
206 }
207 }
208 }),
209 _maintain_current_user: cx.spawn(async move |this, cx| {
210 let mut status = client.status();
211 let weak = Arc::downgrade(&client);
212 drop(client);
213 while let Some(status) = status.next().await {
214 // if the client is dropped, the app is shutting down.
215 let Some(client) = weak.upgrade() else {
216 return Ok(());
217 };
218 match status {
219 Status::Authenticated | Status::Connected { .. } => {
220 if let Some(user_id) = client.user_id() {
221 let response = client
222 .cloud_client()
223 .get_authenticated_user()
224 .await
225 .log_err();
226
227 let current_user_and_response = if let Some(response) = response {
228 let user = Arc::new(User {
229 id: user_id,
230 github_login: response.user.github_login.clone().into(),
231 avatar_uri: response.user.avatar_url.clone().into(),
232 name: response.user.name.clone(),
233 });
234
235 Some((user, response))
236 } else {
237 None
238 };
239 current_user_tx
240 .send(
241 current_user_and_response
242 .as_ref()
243 .map(|(user, _)| user.clone()),
244 )
245 .await
246 .ok();
247
248 cx.update(|cx| {
249 if let Some((user, response)) = current_user_and_response {
250 this.update(cx, |this, cx| {
251 this.by_github_login
252 .insert(user.github_login.clone(), user_id);
253 this.users.insert(user_id, user);
254 this.update_authenticated_user(response, cx)
255 })
256 } else {
257 anyhow::Ok(())
258 }
259 })??;
260
261 this.update(cx, |_, cx| cx.notify())?;
262 }
263 }
264 Status::SignedOut => {
265 current_user_tx.send(None).await.ok();
266 this.update(cx, |this, cx| {
267 cx.emit(Event::PrivateUserInfoUpdated);
268 cx.notify();
269 this.clear_contacts()
270 })?
271 .await;
272 }
273 Status::ConnectionLost => {
274 this.update(cx, |this, cx| {
275 cx.notify();
276 this.clear_contacts()
277 })?
278 .await;
279 }
280 _ => {}
281 }
282 }
283 Ok(())
284 }),
285 pending_contact_requests: Default::default(),
286 weak_self: cx.weak_entity(),
287 }
288 }
289
290 #[cfg(feature = "test-support")]
291 pub fn clear_cache(&mut self) {
292 self.users.clear();
293 self.by_github_login.clear();
294 }
295
296 async fn handle_update_invite_info(
297 this: Entity<Self>,
298 message: TypedEnvelope<proto::UpdateInviteInfo>,
299 mut cx: AsyncApp,
300 ) -> Result<()> {
301 this.update(&mut cx, |this, cx| {
302 this.invite_info = Some(InviteInfo {
303 url: Arc::from(message.payload.url),
304 count: message.payload.count,
305 });
306 cx.notify();
307 })?;
308 Ok(())
309 }
310
311 async fn handle_show_contacts(
312 this: Entity<Self>,
313 _: TypedEnvelope<proto::ShowContacts>,
314 mut cx: AsyncApp,
315 ) -> Result<()> {
316 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
317 Ok(())
318 }
319
320 pub fn invite_info(&self) -> Option<&InviteInfo> {
321 self.invite_info.as_ref()
322 }
323
324 async fn handle_update_contacts(
325 this: Entity<Self>,
326 message: TypedEnvelope<proto::UpdateContacts>,
327 cx: AsyncApp,
328 ) -> Result<()> {
329 this.read_with(&cx, |this, _| {
330 this.update_contacts_tx
331 .unbounded_send(UpdateContacts::Update(message.payload))
332 .unwrap();
333 })?;
334 Ok(())
335 }
336
337 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
338 match message {
339 UpdateContacts::Wait(barrier) => {
340 drop(barrier);
341 Task::ready(Ok(()))
342 }
343 UpdateContacts::Clear(barrier) => {
344 self.contacts.clear();
345 self.incoming_contact_requests.clear();
346 self.outgoing_contact_requests.clear();
347 drop(barrier);
348 Task::ready(Ok(()))
349 }
350 UpdateContacts::Update(message) => {
351 let mut user_ids = HashSet::default();
352 for contact in &message.contacts {
353 user_ids.insert(contact.user_id);
354 }
355 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
356 user_ids.extend(message.outgoing_requests.iter());
357
358 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
359 cx.spawn(async move |this, cx| {
360 load_users.await?;
361
362 // Users are fetched in parallel above and cached in call to get_users
363 // No need to parallelize here
364 let mut updated_contacts = Vec::new();
365 let this = this.upgrade().context("can't upgrade user store handle")?;
366 for contact in message.contacts {
367 updated_contacts
368 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
369 }
370
371 let mut incoming_requests = Vec::new();
372 for request in message.incoming_requests {
373 incoming_requests.push({
374 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
375 .await?
376 });
377 }
378
379 let mut outgoing_requests = Vec::new();
380 for requested_user_id in message.outgoing_requests {
381 outgoing_requests.push(
382 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
383 .await?,
384 );
385 }
386
387 let removed_contacts =
388 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
389 let removed_incoming_requests =
390 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
391 let removed_outgoing_requests =
392 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
393
394 this.update(cx, |this, cx| {
395 // Remove contacts
396 this.contacts
397 .retain(|contact| !removed_contacts.contains(&contact.user.id));
398 // Update existing contacts and insert new ones
399 for updated_contact in updated_contacts {
400 match this.contacts.binary_search_by_key(
401 &&updated_contact.user.github_login,
402 |contact| &contact.user.github_login,
403 ) {
404 Ok(ix) => this.contacts[ix] = updated_contact,
405 Err(ix) => this.contacts.insert(ix, updated_contact),
406 }
407 }
408
409 // Remove incoming contact requests
410 this.incoming_contact_requests.retain(|user| {
411 if removed_incoming_requests.contains(&user.id) {
412 cx.emit(Event::Contact {
413 user: user.clone(),
414 kind: ContactEventKind::Cancelled,
415 });
416 false
417 } else {
418 true
419 }
420 });
421 // Update existing incoming requests and insert new ones
422 for user in incoming_requests {
423 match this
424 .incoming_contact_requests
425 .binary_search_by_key(&&user.github_login, |contact| {
426 &contact.github_login
427 }) {
428 Ok(ix) => this.incoming_contact_requests[ix] = user,
429 Err(ix) => this.incoming_contact_requests.insert(ix, user),
430 }
431 }
432
433 // Remove outgoing contact requests
434 this.outgoing_contact_requests
435 .retain(|user| !removed_outgoing_requests.contains(&user.id));
436 // Update existing incoming requests and insert new ones
437 for request in outgoing_requests {
438 match this
439 .outgoing_contact_requests
440 .binary_search_by_key(&&request.github_login, |contact| {
441 &contact.github_login
442 }) {
443 Ok(ix) => this.outgoing_contact_requests[ix] = request,
444 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
445 }
446 }
447
448 cx.notify();
449 })?;
450
451 Ok(())
452 })
453 }
454 }
455 }
456
457 pub fn contacts(&self) -> &[Arc<Contact>] {
458 &self.contacts
459 }
460
461 pub fn has_contact(&self, user: &Arc<User>) -> bool {
462 self.contacts
463 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
464 .is_ok()
465 }
466
467 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
468 &self.incoming_contact_requests
469 }
470
471 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
472 &self.outgoing_contact_requests
473 }
474
475 pub fn is_contact_request_pending(&self, user: &User) -> bool {
476 self.pending_contact_requests.contains_key(&user.id)
477 }
478
479 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
480 if self
481 .contacts
482 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
483 .is_ok()
484 {
485 ContactRequestStatus::RequestAccepted
486 } else if self
487 .outgoing_contact_requests
488 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
489 .is_ok()
490 {
491 ContactRequestStatus::RequestSent
492 } else if self
493 .incoming_contact_requests
494 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
495 .is_ok()
496 {
497 ContactRequestStatus::RequestReceived
498 } else {
499 ContactRequestStatus::None
500 }
501 }
502
503 pub fn request_contact(
504 &mut self,
505 responder_id: u64,
506 cx: &mut Context<Self>,
507 ) -> Task<Result<()>> {
508 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
509 }
510
511 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
512 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
513 }
514
515 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
516 self.incoming_contact_requests
517 .iter()
518 .any(|user| user.id == user_id)
519 }
520
521 pub fn respond_to_contact_request(
522 &mut self,
523 requester_id: u64,
524 accept: bool,
525 cx: &mut Context<Self>,
526 ) -> Task<Result<()>> {
527 self.perform_contact_request(
528 requester_id,
529 proto::RespondToContactRequest {
530 requester_id,
531 response: if accept {
532 proto::ContactRequestResponse::Accept
533 } else {
534 proto::ContactRequestResponse::Decline
535 } as i32,
536 },
537 cx,
538 )
539 }
540
541 pub fn dismiss_contact_request(
542 &self,
543 requester_id: u64,
544 cx: &Context<Self>,
545 ) -> Task<Result<()>> {
546 let client = self.client.upgrade();
547 cx.spawn(async move |_, _| {
548 client
549 .context("can't upgrade client reference")?
550 .request(proto::RespondToContactRequest {
551 requester_id,
552 response: proto::ContactRequestResponse::Dismiss as i32,
553 })
554 .await?;
555 Ok(())
556 })
557 }
558
559 fn perform_contact_request<T: RequestMessage>(
560 &mut self,
561 user_id: u64,
562 request: T,
563 cx: &mut Context<Self>,
564 ) -> Task<Result<()>> {
565 let client = self.client.upgrade();
566 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
567 cx.notify();
568
569 cx.spawn(async move |this, cx| {
570 let response = client
571 .context("can't upgrade client reference")?
572 .request(request)
573 .await;
574 this.update(cx, |this, cx| {
575 if let Entry::Occupied(mut request_count) =
576 this.pending_contact_requests.entry(user_id)
577 {
578 *request_count.get_mut() -= 1;
579 if *request_count.get() == 0 {
580 request_count.remove();
581 }
582 }
583 cx.notify();
584 })?;
585 response?;
586 Ok(())
587 })
588 }
589
590 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
591 let (tx, mut rx) = postage::barrier::channel();
592 self.update_contacts_tx
593 .unbounded_send(UpdateContacts::Clear(tx))
594 .unwrap();
595 async move {
596 rx.next().await;
597 }
598 }
599
600 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
601 let (tx, mut rx) = postage::barrier::channel();
602 self.update_contacts_tx
603 .unbounded_send(UpdateContacts::Wait(tx))
604 .unwrap();
605 async move {
606 rx.next().await;
607 }
608 }
609
610 pub fn get_users(
611 &self,
612 user_ids: Vec<u64>,
613 cx: &Context<Self>,
614 ) -> Task<Result<Vec<Arc<User>>>> {
615 let mut user_ids_to_fetch = user_ids.clone();
616 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
617
618 cx.spawn(async move |this, cx| {
619 if !user_ids_to_fetch.is_empty() {
620 this.update(cx, |this, cx| {
621 this.load_users(
622 proto::GetUsers {
623 user_ids: user_ids_to_fetch,
624 },
625 cx,
626 )
627 })?
628 .await?;
629 }
630
631 this.read_with(cx, |this, _| {
632 user_ids
633 .iter()
634 .map(|user_id| {
635 this.users
636 .get(user_id)
637 .cloned()
638 .with_context(|| format!("user {user_id} not found"))
639 })
640 .collect()
641 })?
642 })
643 }
644
645 pub fn fuzzy_search_users(
646 &self,
647 query: String,
648 cx: &Context<Self>,
649 ) -> Task<Result<Vec<Arc<User>>>> {
650 self.load_users(proto::FuzzySearchUsers { query }, cx)
651 }
652
653 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
654 self.users.get(&user_id).cloned()
655 }
656
657 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
658 if let Some(user) = self.users.get(&user_id).cloned() {
659 return Some(user);
660 }
661
662 self.get_user(user_id, cx).detach_and_log_err(cx);
663 None
664 }
665
666 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
667 if let Some(user) = self.users.get(&user_id).cloned() {
668 return Task::ready(Ok(user));
669 }
670
671 let load_users = self.get_users(vec![user_id], cx);
672 cx.spawn(async move |this, cx| {
673 load_users.await?;
674 this.read_with(cx, |this, _| {
675 this.users
676 .get(&user_id)
677 .cloned()
678 .context("server responded with no users")
679 })?
680 })
681 }
682
683 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
684 self.by_github_login
685 .get(github_login)
686 .and_then(|id| self.users.get(id).cloned())
687 }
688
689 pub fn current_user(&self) -> Option<Arc<User>> {
690 self.current_user.borrow().clone()
691 }
692
693 pub fn plan(&self) -> Option<cloud_llm_client::Plan> {
694 #[cfg(debug_assertions)]
695 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
696 return match plan.as_str() {
697 "free" => Some(cloud_llm_client::Plan::ZedFree),
698 "trial" => Some(cloud_llm_client::Plan::ZedProTrial),
699 "pro" => Some(cloud_llm_client::Plan::ZedPro),
700 _ => {
701 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
702 }
703 };
704 }
705
706 self.plan_info.as_ref().map(|info| info.plan)
707 }
708
709 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
710 self.plan_info
711 .as_ref()
712 .and_then(|plan| plan.subscription_period)
713 .map(|subscription_period| {
714 (
715 subscription_period.started_at.0,
716 subscription_period.ended_at.0,
717 )
718 })
719 }
720
721 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
722 self.plan_info
723 .as_ref()
724 .and_then(|plan| plan.trial_started_at)
725 .map(|trial_started_at| trial_started_at.0)
726 }
727
728 /// Returns whether the user's account is too new to use the service.
729 pub fn account_too_young(&self) -> bool {
730 self.plan_info
731 .as_ref()
732 .map(|plan| plan.is_account_too_young)
733 .unwrap_or_default()
734 }
735
736 /// Returns whether the current user has overdue invoices and usage should be blocked.
737 pub fn has_overdue_invoices(&self) -> bool {
738 self.plan_info
739 .as_ref()
740 .map(|plan| plan.has_overdue_invoices)
741 .unwrap_or_default()
742 }
743
744 pub fn is_usage_based_billing_enabled(&self) -> bool {
745 self.plan_info
746 .as_ref()
747 .map(|plan| plan.is_usage_based_billing_enabled)
748 .unwrap_or_default()
749 }
750
751 pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
752 self.model_request_usage
753 }
754
755 pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
756 self.model_request_usage = Some(usage);
757 cx.notify();
758 }
759
760 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
761 self.edit_prediction_usage
762 }
763
764 pub fn update_edit_prediction_usage(
765 &mut self,
766 usage: EditPredictionUsage,
767 cx: &mut Context<Self>,
768 ) {
769 self.edit_prediction_usage = Some(usage);
770 cx.notify();
771 }
772
773 fn update_authenticated_user(
774 &mut self,
775 response: GetAuthenticatedUserResponse,
776 cx: &mut Context<Self>,
777 ) {
778 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
779 cx.update_flags(staff, response.feature_flags);
780 if let Some(client) = self.client.upgrade() {
781 client
782 .telemetry
783 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
784 }
785
786 self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
787 limit: response.plan.usage.model_requests.limit,
788 amount: response.plan.usage.model_requests.used as i32,
789 }));
790 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
791 limit: response.plan.usage.edit_predictions.limit,
792 amount: response.plan.usage.edit_predictions.used as i32,
793 }));
794 self.plan_info = Some(response.plan);
795 cx.emit(Event::PrivateUserInfoUpdated);
796 }
797
798 fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
799 cx.spawn(async move |cx| {
800 match message {
801 MessageToClient::UserUpdated => {
802 let cloud_client = cx
803 .update(|cx| {
804 this.read_with(cx, |this, _cx| {
805 this.client.upgrade().map(|client| client.cloud_client())
806 })
807 })??
808 .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
809
810 let response = cloud_client.get_authenticated_user().await?;
811 cx.update(|cx| {
812 this.update(cx, |this, cx| {
813 this.update_authenticated_user(response, cx);
814 })
815 })??;
816 }
817 }
818
819 anyhow::Ok(())
820 })
821 .detach_and_log_err(cx);
822 }
823
824 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
825 self.current_user.clone()
826 }
827
828 fn load_users(
829 &self,
830 request: impl RequestMessage<Response = UsersResponse>,
831 cx: &Context<Self>,
832 ) -> Task<Result<Vec<Arc<User>>>> {
833 let client = self.client.clone();
834 cx.spawn(async move |this, cx| {
835 if let Some(rpc) = client.upgrade() {
836 let response = rpc.request(request).await.context("error loading users")?;
837 let users = response.users;
838
839 this.update(cx, |this, _| this.insert(users))
840 } else {
841 Ok(Vec::new())
842 }
843 })
844 }
845
846 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
847 let mut ret = Vec::with_capacity(users.len());
848 for user in users {
849 let user = User::new(user);
850 if let Some(old) = self.users.insert(user.id, user.clone())
851 && old.github_login != user.github_login
852 {
853 self.by_github_login.remove(&old.github_login);
854 }
855 self.by_github_login
856 .insert(user.github_login.clone(), user.id);
857 ret.push(user)
858 }
859 ret
860 }
861
862 pub fn set_participant_indices(
863 &mut self,
864 participant_indices: HashMap<u64, ParticipantIndex>,
865 cx: &mut Context<Self>,
866 ) {
867 if participant_indices != self.participant_indices {
868 self.participant_indices = participant_indices;
869 cx.emit(Event::ParticipantIndicesChanged);
870 }
871 }
872
873 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
874 &self.participant_indices
875 }
876
877 pub fn participant_names(
878 &self,
879 user_ids: impl Iterator<Item = u64>,
880 cx: &App,
881 ) -> HashMap<u64, SharedString> {
882 let mut ret = HashMap::default();
883 let mut missing_user_ids = Vec::new();
884 for id in user_ids {
885 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
886 ret.insert(id, github_login);
887 } else {
888 missing_user_ids.push(id)
889 }
890 }
891 if !missing_user_ids.is_empty() {
892 let this = self.weak_self.clone();
893 cx.spawn(async move |cx| {
894 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
895 .await
896 })
897 .detach_and_log_err(cx);
898 }
899 ret
900 }
901}
902
903impl User {
904 fn new(message: proto::User) -> Arc<Self> {
905 Arc::new(User {
906 id: message.id,
907 github_login: message.github_login.into(),
908 avatar_uri: message.avatar_url.into(),
909 name: message.name,
910 })
911 }
912}
913
914impl Contact {
915 async fn from_proto(
916 contact: proto::Contact,
917 user_store: &Entity<UserStore>,
918 cx: &mut AsyncApp,
919 ) -> Result<Self> {
920 let user = user_store
921 .update(cx, |user_store, cx| {
922 user_store.get_user(contact.user_id, cx)
923 })?
924 .await?;
925 Ok(Self {
926 user,
927 online: contact.online,
928 busy: contact.busy,
929 })
930 }
931}
932
933impl Collaborator {
934 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
935 Ok(Self {
936 peer_id: message.peer_id.context("invalid peer id")?,
937 replica_id: message.replica_id as ReplicaId,
938 user_id: message.user_id as UserId,
939 is_host: message.is_host,
940 committer_name: message.committer_name,
941 committer_email: message.committer_email,
942 })
943 }
944}
945
946impl RequestUsage {
947 pub fn over_limit(&self) -> bool {
948 match self.limit {
949 UsageLimit::Limited(limit) => self.amount >= limit,
950 UsageLimit::Unlimited => false,
951 }
952 }
953
954 fn from_headers(
955 limit_name: &str,
956 amount_name: &str,
957 headers: &HeaderMap<HeaderValue>,
958 ) -> Result<Self> {
959 let limit = headers
960 .get(limit_name)
961 .with_context(|| format!("missing {limit_name:?} header"))?;
962 let limit = UsageLimit::from_str(limit.to_str()?)?;
963
964 let amount = headers
965 .get(amount_name)
966 .with_context(|| format!("missing {amount_name:?} header"))?;
967 let amount = amount.to_str()?.parse::<i32>()?;
968
969 Ok(Self { limit, amount })
970 }
971}
972
973impl ModelRequestUsage {
974 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
975 Ok(Self(RequestUsage::from_headers(
976 MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
977 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
978 headers,
979 )?))
980 }
981}
982
983impl EditPredictionUsage {
984 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
985 Ok(Self(RequestUsage::from_headers(
986 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
987 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
988 headers,
989 )?))
990 }
991}