1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result};
3use chrono::{DateTime, Utc};
4use cloud_api_client::websocket_protocol::MessageToClient;
5use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
6use cloud_llm_client::{
7 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
8 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, Plan,
9 UsageLimit,
10};
11use collections::{HashMap, HashSet, hash_map::Entry};
12use derive_more::Deref;
13use feature_flags::FeatureFlagAppExt;
14use futures::{Future, StreamExt, channel::mpsc};
15use gpui::{
16 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
17};
18use http_client::http::{HeaderMap, HeaderValue};
19use postage::{sink::Sink, watch};
20use rpc::proto::{RequestMessage, UsersResponse};
21use std::{
22 str::FromStr as _,
23 sync::{Arc, Weak},
24};
25use text::ReplicaId;
26use util::{ResultExt, TryFutureExt as _};
27
28pub type UserId = u64;
29
30#[derive(
31 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
32)]
33pub struct ChannelId(pub u64);
34
35impl std::fmt::Display for ChannelId {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 self.0.fmt(f)
38 }
39}
40
41#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
42pub struct ProjectId(pub u64);
43
44impl ProjectId {
45 pub fn to_proto(self) -> u64 {
46 self.0
47 }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub struct ParticipantIndex(pub u32);
52
53#[derive(Default, Debug)]
54pub struct User {
55 pub id: UserId,
56 pub github_login: SharedString,
57 pub avatar_uri: SharedUri,
58 pub name: Option<String>,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq)]
62pub struct Collaborator {
63 pub peer_id: proto::PeerId,
64 pub replica_id: ReplicaId,
65 pub user_id: UserId,
66 pub is_host: bool,
67 pub committer_name: Option<String>,
68 pub committer_email: Option<String>,
69}
70
71impl PartialOrd for User {
72 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
73 Some(self.cmp(other))
74 }
75}
76
77impl Ord for User {
78 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
79 self.github_login.cmp(&other.github_login)
80 }
81}
82
83impl PartialEq for User {
84 fn eq(&self, other: &Self) -> bool {
85 self.id == other.id && self.github_login == other.github_login
86 }
87}
88
89impl Eq for User {}
90
91#[derive(Debug, PartialEq)]
92pub struct Contact {
93 pub user: Arc<User>,
94 pub online: bool,
95 pub busy: bool,
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub enum ContactRequestStatus {
100 None,
101 RequestSent,
102 RequestReceived,
103 RequestAccepted,
104}
105
106pub struct UserStore {
107 users: HashMap<u64, Arc<User>>,
108 by_github_login: HashMap<SharedString, u64>,
109 participant_indices: HashMap<u64, ParticipantIndex>,
110 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
111 model_request_usage: Option<ModelRequestUsage>,
112 edit_prediction_usage: Option<EditPredictionUsage>,
113 plan_info: Option<PlanInfo>,
114 current_user: watch::Receiver<Option<Arc<User>>>,
115 contacts: Vec<Arc<Contact>>,
116 incoming_contact_requests: Vec<Arc<User>>,
117 outgoing_contact_requests: Vec<Arc<User>>,
118 pending_contact_requests: HashMap<u64, usize>,
119 invite_info: Option<InviteInfo>,
120 client: Weak<Client>,
121 _maintain_contacts: Task<()>,
122 _maintain_current_user: Task<Result<()>>,
123 weak_self: WeakEntity<Self>,
124}
125
126#[derive(Clone)]
127pub struct InviteInfo {
128 pub count: u32,
129 pub url: Arc<str>,
130}
131
132pub enum Event {
133 Contact {
134 user: Arc<User>,
135 kind: ContactEventKind,
136 },
137 ShowContacts,
138 ParticipantIndicesChanged,
139 PrivateUserInfoUpdated,
140 PlanUpdated,
141}
142
143#[derive(Clone, Copy)]
144pub enum ContactEventKind {
145 Requested,
146 Accepted,
147 Cancelled,
148}
149
150impl EventEmitter<Event> for UserStore {}
151
152enum UpdateContacts {
153 Update(proto::UpdateContacts),
154 Wait(postage::barrier::Sender),
155 Clear(postage::barrier::Sender),
156}
157
158#[derive(Debug, Clone, Copy, Deref)]
159pub struct ModelRequestUsage(pub RequestUsage);
160
161#[derive(Debug, Clone, Copy, Deref)]
162pub struct EditPredictionUsage(pub RequestUsage);
163
164#[derive(Debug, Clone, Copy)]
165pub struct RequestUsage {
166 pub limit: UsageLimit,
167 pub amount: i32,
168}
169
170impl UserStore {
171 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
172 let (mut current_user_tx, current_user_rx) = watch::channel();
173 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
174 let rpc_subscriptions = vec![
175 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
176 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
177 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
178 ];
179
180 client.add_message_to_client_handler({
181 let this = cx.weak_entity();
182 move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
183 });
184
185 Self {
186 users: Default::default(),
187 by_github_login: Default::default(),
188 current_user: current_user_rx,
189 plan_info: None,
190 model_request_usage: None,
191 edit_prediction_usage: None,
192 contacts: Default::default(),
193 incoming_contact_requests: Default::default(),
194 participant_indices: Default::default(),
195 outgoing_contact_requests: Default::default(),
196 invite_info: None,
197 client: Arc::downgrade(&client),
198 update_contacts_tx,
199 _maintain_contacts: cx.spawn(async move |this, cx| {
200 let _subscriptions = rpc_subscriptions;
201 while let Some(message) = update_contacts_rx.next().await {
202 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
203 {
204 task.log_err().await;
205 } else {
206 break;
207 }
208 }
209 }),
210 _maintain_current_user: cx.spawn(async move |this, cx| {
211 let mut status = client.status();
212 let weak = Arc::downgrade(&client);
213 drop(client);
214 while let Some(status) = status.next().await {
215 // if the client is dropped, the app is shutting down.
216 let Some(client) = weak.upgrade() else {
217 return Ok(());
218 };
219 match status {
220 Status::Authenticated
221 | Status::Reauthenticated
222 | Status::Connected { .. } => {
223 if let Some(user_id) = client.user_id() {
224 let response = client
225 .cloud_client()
226 .get_authenticated_user()
227 .await
228 .log_err();
229
230 let current_user_and_response = if let Some(response) = response {
231 let user = Arc::new(User {
232 id: user_id,
233 github_login: response.user.github_login.clone().into(),
234 avatar_uri: response.user.avatar_url.clone().into(),
235 name: response.user.name.clone(),
236 });
237
238 Some((user, response))
239 } else {
240 None
241 };
242 current_user_tx
243 .send(
244 current_user_and_response
245 .as_ref()
246 .map(|(user, _)| user.clone()),
247 )
248 .await
249 .ok();
250
251 cx.update(|cx| {
252 if let Some((user, response)) = current_user_and_response {
253 this.update(cx, |this, cx| {
254 this.by_github_login
255 .insert(user.github_login.clone(), user_id);
256 this.users.insert(user_id, user);
257 this.update_authenticated_user(response, cx)
258 })
259 } else {
260 anyhow::Ok(())
261 }
262 })??;
263
264 this.update(cx, |_, cx| cx.notify())?;
265 }
266 }
267 Status::SignedOut => {
268 current_user_tx.send(None).await.ok();
269 this.update(cx, |this, cx| {
270 cx.emit(Event::PrivateUserInfoUpdated);
271 cx.notify();
272 this.clear_contacts()
273 })?
274 .await;
275 }
276 Status::ConnectionLost => {
277 this.update(cx, |this, cx| {
278 cx.notify();
279 this.clear_contacts()
280 })?
281 .await;
282 }
283 _ => {}
284 }
285 }
286 Ok(())
287 }),
288 pending_contact_requests: Default::default(),
289 weak_self: cx.weak_entity(),
290 }
291 }
292
293 #[cfg(feature = "test-support")]
294 pub fn clear_cache(&mut self) {
295 self.users.clear();
296 self.by_github_login.clear();
297 }
298
299 async fn handle_update_invite_info(
300 this: Entity<Self>,
301 message: TypedEnvelope<proto::UpdateInviteInfo>,
302 mut cx: AsyncApp,
303 ) -> Result<()> {
304 this.update(&mut cx, |this, cx| {
305 this.invite_info = Some(InviteInfo {
306 url: Arc::from(message.payload.url),
307 count: message.payload.count,
308 });
309 cx.notify();
310 })?;
311 Ok(())
312 }
313
314 async fn handle_show_contacts(
315 this: Entity<Self>,
316 _: TypedEnvelope<proto::ShowContacts>,
317 mut cx: AsyncApp,
318 ) -> Result<()> {
319 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
320 Ok(())
321 }
322
323 pub fn invite_info(&self) -> Option<&InviteInfo> {
324 self.invite_info.as_ref()
325 }
326
327 async fn handle_update_contacts(
328 this: Entity<Self>,
329 message: TypedEnvelope<proto::UpdateContacts>,
330 cx: AsyncApp,
331 ) -> Result<()> {
332 this.read_with(&cx, |this, _| {
333 this.update_contacts_tx
334 .unbounded_send(UpdateContacts::Update(message.payload))
335 .unwrap();
336 })?;
337 Ok(())
338 }
339
340 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
341 match message {
342 UpdateContacts::Wait(barrier) => {
343 drop(barrier);
344 Task::ready(Ok(()))
345 }
346 UpdateContacts::Clear(barrier) => {
347 self.contacts.clear();
348 self.incoming_contact_requests.clear();
349 self.outgoing_contact_requests.clear();
350 drop(barrier);
351 Task::ready(Ok(()))
352 }
353 UpdateContacts::Update(message) => {
354 let mut user_ids = HashSet::default();
355 for contact in &message.contacts {
356 user_ids.insert(contact.user_id);
357 }
358 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
359 user_ids.extend(message.outgoing_requests.iter());
360
361 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
362 cx.spawn(async move |this, cx| {
363 load_users.await?;
364
365 // Users are fetched in parallel above and cached in call to get_users
366 // No need to parallelize here
367 let mut updated_contacts = Vec::new();
368 let this = this.upgrade().context("can't upgrade user store handle")?;
369 for contact in message.contacts {
370 updated_contacts
371 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
372 }
373
374 let mut incoming_requests = Vec::new();
375 for request in message.incoming_requests {
376 incoming_requests.push({
377 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
378 .await?
379 });
380 }
381
382 let mut outgoing_requests = Vec::new();
383 for requested_user_id in message.outgoing_requests {
384 outgoing_requests.push(
385 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
386 .await?,
387 );
388 }
389
390 let removed_contacts =
391 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
392 let removed_incoming_requests =
393 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
394 let removed_outgoing_requests =
395 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
396
397 this.update(cx, |this, cx| {
398 // Remove contacts
399 this.contacts
400 .retain(|contact| !removed_contacts.contains(&contact.user.id));
401 // Update existing contacts and insert new ones
402 for updated_contact in updated_contacts {
403 match this.contacts.binary_search_by_key(
404 &&updated_contact.user.github_login,
405 |contact| &contact.user.github_login,
406 ) {
407 Ok(ix) => this.contacts[ix] = updated_contact,
408 Err(ix) => this.contacts.insert(ix, updated_contact),
409 }
410 }
411
412 // Remove incoming contact requests
413 this.incoming_contact_requests.retain(|user| {
414 if removed_incoming_requests.contains(&user.id) {
415 cx.emit(Event::Contact {
416 user: user.clone(),
417 kind: ContactEventKind::Cancelled,
418 });
419 false
420 } else {
421 true
422 }
423 });
424 // Update existing incoming requests and insert new ones
425 for user in incoming_requests {
426 match this
427 .incoming_contact_requests
428 .binary_search_by_key(&&user.github_login, |contact| {
429 &contact.github_login
430 }) {
431 Ok(ix) => this.incoming_contact_requests[ix] = user,
432 Err(ix) => this.incoming_contact_requests.insert(ix, user),
433 }
434 }
435
436 // Remove outgoing contact requests
437 this.outgoing_contact_requests
438 .retain(|user| !removed_outgoing_requests.contains(&user.id));
439 // Update existing incoming requests and insert new ones
440 for request in outgoing_requests {
441 match this
442 .outgoing_contact_requests
443 .binary_search_by_key(&&request.github_login, |contact| {
444 &contact.github_login
445 }) {
446 Ok(ix) => this.outgoing_contact_requests[ix] = request,
447 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
448 }
449 }
450
451 cx.notify();
452 })?;
453
454 Ok(())
455 })
456 }
457 }
458 }
459
460 pub fn contacts(&self) -> &[Arc<Contact>] {
461 &self.contacts
462 }
463
464 pub fn has_contact(&self, user: &Arc<User>) -> bool {
465 self.contacts
466 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
467 .is_ok()
468 }
469
470 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
471 &self.incoming_contact_requests
472 }
473
474 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
475 &self.outgoing_contact_requests
476 }
477
478 pub fn is_contact_request_pending(&self, user: &User) -> bool {
479 self.pending_contact_requests.contains_key(&user.id)
480 }
481
482 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
483 if self
484 .contacts
485 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
486 .is_ok()
487 {
488 ContactRequestStatus::RequestAccepted
489 } else if self
490 .outgoing_contact_requests
491 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
492 .is_ok()
493 {
494 ContactRequestStatus::RequestSent
495 } else if self
496 .incoming_contact_requests
497 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
498 .is_ok()
499 {
500 ContactRequestStatus::RequestReceived
501 } else {
502 ContactRequestStatus::None
503 }
504 }
505
506 pub fn request_contact(
507 &mut self,
508 responder_id: u64,
509 cx: &mut Context<Self>,
510 ) -> Task<Result<()>> {
511 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
512 }
513
514 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
515 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
516 }
517
518 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
519 self.incoming_contact_requests
520 .iter()
521 .any(|user| user.id == user_id)
522 }
523
524 pub fn respond_to_contact_request(
525 &mut self,
526 requester_id: u64,
527 accept: bool,
528 cx: &mut Context<Self>,
529 ) -> Task<Result<()>> {
530 self.perform_contact_request(
531 requester_id,
532 proto::RespondToContactRequest {
533 requester_id,
534 response: if accept {
535 proto::ContactRequestResponse::Accept
536 } else {
537 proto::ContactRequestResponse::Decline
538 } as i32,
539 },
540 cx,
541 )
542 }
543
544 pub fn dismiss_contact_request(
545 &self,
546 requester_id: u64,
547 cx: &Context<Self>,
548 ) -> Task<Result<()>> {
549 let client = self.client.upgrade();
550 cx.spawn(async move |_, _| {
551 client
552 .context("can't upgrade client reference")?
553 .request(proto::RespondToContactRequest {
554 requester_id,
555 response: proto::ContactRequestResponse::Dismiss as i32,
556 })
557 .await?;
558 Ok(())
559 })
560 }
561
562 fn perform_contact_request<T: RequestMessage>(
563 &mut self,
564 user_id: u64,
565 request: T,
566 cx: &mut Context<Self>,
567 ) -> Task<Result<()>> {
568 let client = self.client.upgrade();
569 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
570 cx.notify();
571
572 cx.spawn(async move |this, cx| {
573 let response = client
574 .context("can't upgrade client reference")?
575 .request(request)
576 .await;
577 this.update(cx, |this, cx| {
578 if let Entry::Occupied(mut request_count) =
579 this.pending_contact_requests.entry(user_id)
580 {
581 *request_count.get_mut() -= 1;
582 if *request_count.get() == 0 {
583 request_count.remove();
584 }
585 }
586 cx.notify();
587 })?;
588 response?;
589 Ok(())
590 })
591 }
592
593 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
594 let (tx, mut rx) = postage::barrier::channel();
595 self.update_contacts_tx
596 .unbounded_send(UpdateContacts::Clear(tx))
597 .unwrap();
598 async move {
599 rx.next().await;
600 }
601 }
602
603 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
604 let (tx, mut rx) = postage::barrier::channel();
605 self.update_contacts_tx
606 .unbounded_send(UpdateContacts::Wait(tx))
607 .unwrap();
608 async move {
609 rx.next().await;
610 }
611 }
612
613 pub fn get_users(
614 &self,
615 user_ids: Vec<u64>,
616 cx: &Context<Self>,
617 ) -> Task<Result<Vec<Arc<User>>>> {
618 let mut user_ids_to_fetch = user_ids.clone();
619 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
620
621 cx.spawn(async move |this, cx| {
622 if !user_ids_to_fetch.is_empty() {
623 this.update(cx, |this, cx| {
624 this.load_users(
625 proto::GetUsers {
626 user_ids: user_ids_to_fetch,
627 },
628 cx,
629 )
630 })?
631 .await?;
632 }
633
634 this.read_with(cx, |this, _| {
635 user_ids
636 .iter()
637 .map(|user_id| {
638 this.users
639 .get(user_id)
640 .cloned()
641 .with_context(|| format!("user {user_id} not found"))
642 })
643 .collect()
644 })?
645 })
646 }
647
648 pub fn fuzzy_search_users(
649 &self,
650 query: String,
651 cx: &Context<Self>,
652 ) -> Task<Result<Vec<Arc<User>>>> {
653 self.load_users(proto::FuzzySearchUsers { query }, cx)
654 }
655
656 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
657 self.users.get(&user_id).cloned()
658 }
659
660 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
661 if let Some(user) = self.users.get(&user_id).cloned() {
662 return Some(user);
663 }
664
665 self.get_user(user_id, cx).detach_and_log_err(cx);
666 None
667 }
668
669 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
670 if let Some(user) = self.users.get(&user_id).cloned() {
671 return Task::ready(Ok(user));
672 }
673
674 let load_users = self.get_users(vec![user_id], cx);
675 cx.spawn(async move |this, cx| {
676 load_users.await?;
677 this.read_with(cx, |this, _| {
678 this.users
679 .get(&user_id)
680 .cloned()
681 .context("server responded with no users")
682 })?
683 })
684 }
685
686 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
687 self.by_github_login
688 .get(github_login)
689 .and_then(|id| self.users.get(id).cloned())
690 }
691
692 pub fn current_user(&self) -> Option<Arc<User>> {
693 self.current_user.borrow().clone()
694 }
695
696 pub fn plan(&self) -> Option<Plan> {
697 #[cfg(debug_assertions)]
698 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
699 use cloud_llm_client::PlanV1;
700
701 return match plan.as_str() {
702 "free" => Some(Plan::V1(PlanV1::ZedFree)),
703 "trial" => Some(Plan::V1(PlanV1::ZedProTrial)),
704 "pro" => Some(Plan::V1(PlanV1::ZedPro)),
705 _ => {
706 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
707 }
708 };
709 }
710
711 self.plan_info.as_ref().map(|info| info.plan())
712 }
713
714 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
715 self.plan_info
716 .as_ref()
717 .and_then(|plan| plan.subscription_period)
718 .map(|subscription_period| {
719 (
720 subscription_period.started_at.0,
721 subscription_period.ended_at.0,
722 )
723 })
724 }
725
726 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
727 self.plan_info
728 .as_ref()
729 .and_then(|plan| plan.trial_started_at)
730 .map(|trial_started_at| trial_started_at.0)
731 }
732
733 /// Returns whether the user's account is too new to use the service.
734 pub fn account_too_young(&self) -> bool {
735 self.plan_info
736 .as_ref()
737 .map(|plan| plan.is_account_too_young)
738 .unwrap_or_default()
739 }
740
741 /// Returns whether the current user has overdue invoices and usage should be blocked.
742 pub fn has_overdue_invoices(&self) -> bool {
743 self.plan_info
744 .as_ref()
745 .map(|plan| plan.has_overdue_invoices)
746 .unwrap_or_default()
747 }
748
749 pub fn is_usage_based_billing_enabled(&self) -> bool {
750 self.plan_info
751 .as_ref()
752 .map(|plan| plan.is_usage_based_billing_enabled)
753 .unwrap_or_default()
754 }
755
756 pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
757 if self.plan().is_some_and(|plan| plan.is_v2()) {
758 return None;
759 }
760
761 self.model_request_usage
762 }
763
764 pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
765 self.model_request_usage = Some(usage);
766 cx.notify();
767 }
768
769 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
770 self.edit_prediction_usage
771 }
772
773 pub fn update_edit_prediction_usage(
774 &mut self,
775 usage: EditPredictionUsage,
776 cx: &mut Context<Self>,
777 ) {
778 self.edit_prediction_usage = Some(usage);
779 cx.notify();
780 }
781
782 fn update_authenticated_user(
783 &mut self,
784 response: GetAuthenticatedUserResponse,
785 cx: &mut Context<Self>,
786 ) {
787 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
788 cx.update_flags(staff, response.feature_flags);
789 if let Some(client) = self.client.upgrade() {
790 client
791 .telemetry
792 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
793 }
794
795 self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
796 limit: response.plan.usage.model_requests.limit,
797 amount: response.plan.usage.model_requests.used as i32,
798 }));
799 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
800 limit: response.plan.usage.edit_predictions.limit,
801 amount: response.plan.usage.edit_predictions.used as i32,
802 }));
803 self.plan_info = Some(response.plan);
804 cx.emit(Event::PrivateUserInfoUpdated);
805 }
806
807 fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
808 cx.spawn(async move |cx| {
809 match message {
810 MessageToClient::UserUpdated => {
811 let cloud_client = cx
812 .update(|cx| {
813 this.read_with(cx, |this, _cx| {
814 this.client.upgrade().map(|client| client.cloud_client())
815 })
816 })??
817 .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
818
819 let response = cloud_client.get_authenticated_user().await?;
820 cx.update(|cx| {
821 this.update(cx, |this, cx| {
822 this.update_authenticated_user(response, cx);
823 })
824 })??;
825 }
826 }
827
828 anyhow::Ok(())
829 })
830 .detach_and_log_err(cx);
831 }
832
833 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
834 self.current_user.clone()
835 }
836
837 fn load_users(
838 &self,
839 request: impl RequestMessage<Response = UsersResponse>,
840 cx: &Context<Self>,
841 ) -> Task<Result<Vec<Arc<User>>>> {
842 let client = self.client.clone();
843 cx.spawn(async move |this, cx| {
844 if let Some(rpc) = client.upgrade() {
845 let response = rpc.request(request).await.context("error loading users")?;
846 let users = response.users;
847
848 this.update(cx, |this, _| this.insert(users))
849 } else {
850 Ok(Vec::new())
851 }
852 })
853 }
854
855 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
856 let mut ret = Vec::with_capacity(users.len());
857 for user in users {
858 let user = User::new(user);
859 if let Some(old) = self.users.insert(user.id, user.clone())
860 && old.github_login != user.github_login
861 {
862 self.by_github_login.remove(&old.github_login);
863 }
864 self.by_github_login
865 .insert(user.github_login.clone(), user.id);
866 ret.push(user)
867 }
868 ret
869 }
870
871 pub fn set_participant_indices(
872 &mut self,
873 participant_indices: HashMap<u64, ParticipantIndex>,
874 cx: &mut Context<Self>,
875 ) {
876 if participant_indices != self.participant_indices {
877 self.participant_indices = participant_indices;
878 cx.emit(Event::ParticipantIndicesChanged);
879 }
880 }
881
882 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
883 &self.participant_indices
884 }
885
886 pub fn participant_names(
887 &self,
888 user_ids: impl Iterator<Item = u64>,
889 cx: &App,
890 ) -> HashMap<u64, SharedString> {
891 let mut ret = HashMap::default();
892 let mut missing_user_ids = Vec::new();
893 for id in user_ids {
894 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
895 ret.insert(id, github_login);
896 } else {
897 missing_user_ids.push(id)
898 }
899 }
900 if !missing_user_ids.is_empty() {
901 let this = self.weak_self.clone();
902 cx.spawn(async move |cx| {
903 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
904 .await
905 })
906 .detach_and_log_err(cx);
907 }
908 ret
909 }
910}
911
912impl User {
913 fn new(message: proto::User) -> Arc<Self> {
914 Arc::new(User {
915 id: message.id,
916 github_login: message.github_login.into(),
917 avatar_uri: message.avatar_url.into(),
918 name: message.name,
919 })
920 }
921}
922
923impl Contact {
924 async fn from_proto(
925 contact: proto::Contact,
926 user_store: &Entity<UserStore>,
927 cx: &mut AsyncApp,
928 ) -> Result<Self> {
929 let user = user_store
930 .update(cx, |user_store, cx| {
931 user_store.get_user(contact.user_id, cx)
932 })?
933 .await?;
934 Ok(Self {
935 user,
936 online: contact.online,
937 busy: contact.busy,
938 })
939 }
940}
941
942impl Collaborator {
943 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
944 Ok(Self {
945 peer_id: message.peer_id.context("invalid peer id")?,
946 replica_id: ReplicaId::new(message.replica_id as u16),
947 user_id: message.user_id as UserId,
948 is_host: message.is_host,
949 committer_name: message.committer_name,
950 committer_email: message.committer_email,
951 })
952 }
953}
954
955impl RequestUsage {
956 pub fn over_limit(&self) -> bool {
957 match self.limit {
958 UsageLimit::Limited(limit) => self.amount >= limit,
959 UsageLimit::Unlimited => false,
960 }
961 }
962
963 fn from_headers(
964 limit_name: &str,
965 amount_name: &str,
966 headers: &HeaderMap<HeaderValue>,
967 ) -> Result<Self> {
968 let limit = headers
969 .get(limit_name)
970 .with_context(|| format!("missing {limit_name:?} header"))?;
971 let limit = UsageLimit::from_str(limit.to_str()?)?;
972
973 let amount = headers
974 .get(amount_name)
975 .with_context(|| format!("missing {amount_name:?} header"))?;
976 let amount = amount.to_str()?.parse::<i32>()?;
977
978 Ok(Self { limit, amount })
979 }
980}
981
982impl ModelRequestUsage {
983 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
984 Ok(Self(RequestUsage::from_headers(
985 MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
986 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
987 headers,
988 )?))
989 }
990}
991
992impl EditPredictionUsage {
993 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
994 Ok(Self(RequestUsage::from_headers(
995 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
996 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
997 headers,
998 )?))
999 }
1000}