1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result};
3use chrono::{DateTime, Utc};
4use cloud_api_client::websocket_protocol::MessageToClient;
5use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
6use cloud_llm_client::{
7 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
8 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
9};
10use collections::{HashMap, HashSet, hash_map::Entry};
11use derive_more::Deref;
12use feature_flags::FeatureFlagAppExt;
13use futures::{Future, StreamExt, channel::mpsc};
14use gpui::{
15 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
16};
17use http_client::http::{HeaderMap, HeaderValue};
18use postage::{sink::Sink, watch};
19use rpc::proto::{RequestMessage, UsersResponse};
20use std::{
21 str::FromStr as _,
22 sync::{Arc, Weak},
23};
24use text::ReplicaId;
25use util::{ResultExt, TryFutureExt as _};
26
27pub type UserId = u64;
28
29#[derive(
30 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
31)]
32pub struct ChannelId(pub u64);
33
34impl std::fmt::Display for ChannelId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 self.0.fmt(f)
37 }
38}
39
40#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
41pub struct ProjectId(pub u64);
42
43impl ProjectId {
44 pub fn to_proto(self) -> u64 {
45 self.0
46 }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub struct ParticipantIndex(pub u32);
51
52#[derive(Default, Debug)]
53pub struct User {
54 pub id: UserId,
55 pub github_login: SharedString,
56 pub avatar_uri: SharedUri,
57 pub name: Option<String>,
58}
59
60#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct Collaborator {
62 pub peer_id: proto::PeerId,
63 pub replica_id: ReplicaId,
64 pub user_id: UserId,
65 pub is_host: bool,
66 pub committer_name: Option<String>,
67 pub committer_email: Option<String>,
68}
69
70impl PartialOrd for User {
71 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
72 Some(self.cmp(other))
73 }
74}
75
76impl Ord for User {
77 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
78 self.github_login.cmp(&other.github_login)
79 }
80}
81
82impl PartialEq for User {
83 fn eq(&self, other: &Self) -> bool {
84 self.id == other.id && self.github_login == other.github_login
85 }
86}
87
88impl Eq for User {}
89
90#[derive(Debug, PartialEq)]
91pub struct Contact {
92 pub user: Arc<User>,
93 pub online: bool,
94 pub busy: bool,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ContactRequestStatus {
99 None,
100 RequestSent,
101 RequestReceived,
102 RequestAccepted,
103}
104
105pub struct UserStore {
106 users: HashMap<u64, Arc<User>>,
107 by_github_login: HashMap<SharedString, u64>,
108 participant_indices: HashMap<u64, ParticipantIndex>,
109 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
110 model_request_usage: Option<ModelRequestUsage>,
111 edit_prediction_usage: Option<EditPredictionUsage>,
112 plan_info: Option<PlanInfo>,
113 current_user: watch::Receiver<Option<Arc<User>>>,
114 contacts: Vec<Arc<Contact>>,
115 incoming_contact_requests: Vec<Arc<User>>,
116 outgoing_contact_requests: Vec<Arc<User>>,
117 pending_contact_requests: HashMap<u64, usize>,
118 invite_info: Option<InviteInfo>,
119 client: Weak<Client>,
120 _maintain_contacts: Task<()>,
121 _maintain_current_user: Task<Result<()>>,
122 weak_self: WeakEntity<Self>,
123}
124
125#[derive(Clone)]
126pub struct InviteInfo {
127 pub count: u32,
128 pub url: Arc<str>,
129}
130
131pub enum Event {
132 Contact {
133 user: Arc<User>,
134 kind: ContactEventKind,
135 },
136 ShowContacts,
137 ParticipantIndicesChanged,
138 PrivateUserInfoUpdated,
139 PlanUpdated,
140}
141
142#[derive(Clone, Copy)]
143pub enum ContactEventKind {
144 Requested,
145 Accepted,
146 Cancelled,
147}
148
149impl EventEmitter<Event> for UserStore {}
150
151enum UpdateContacts {
152 Update(proto::UpdateContacts),
153 Wait(postage::barrier::Sender),
154 Clear(postage::barrier::Sender),
155}
156
157#[derive(Debug, Clone, Copy, Deref)]
158pub struct ModelRequestUsage(pub RequestUsage);
159
160#[derive(Debug, Clone, Copy, Deref)]
161pub struct EditPredictionUsage(pub RequestUsage);
162
163#[derive(Debug, Clone, Copy)]
164pub struct RequestUsage {
165 pub limit: UsageLimit,
166 pub amount: i32,
167}
168
169impl UserStore {
170 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
171 let (mut current_user_tx, current_user_rx) = watch::channel();
172 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
173 let rpc_subscriptions = vec![
174 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
175 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
176 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
177 ];
178
179 client.add_message_to_client_handler({
180 let this = cx.weak_entity();
181 move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
182 });
183
184 Self {
185 users: Default::default(),
186 by_github_login: Default::default(),
187 current_user: current_user_rx,
188 plan_info: None,
189 model_request_usage: None,
190 edit_prediction_usage: None,
191 contacts: Default::default(),
192 incoming_contact_requests: Default::default(),
193 participant_indices: Default::default(),
194 outgoing_contact_requests: Default::default(),
195 invite_info: None,
196 client: Arc::downgrade(&client),
197 update_contacts_tx,
198 _maintain_contacts: cx.spawn(async move |this, cx| {
199 let _subscriptions = rpc_subscriptions;
200 while let Some(message) = update_contacts_rx.next().await {
201 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
202 {
203 task.log_err().await;
204 } else {
205 break;
206 }
207 }
208 }),
209 _maintain_current_user: cx.spawn(async move |this, cx| {
210 let mut status = client.status();
211 let weak = Arc::downgrade(&client);
212 drop(client);
213 while let Some(status) = status.next().await {
214 // if the client is dropped, the app is shutting down.
215 let Some(client) = weak.upgrade() else {
216 return Ok(());
217 };
218 match status {
219 Status::Authenticated
220 | Status::Reauthenticated
221 | Status::Connected { .. } => {
222 if let Some(user_id) = client.user_id() {
223 let response = client
224 .cloud_client()
225 .get_authenticated_user()
226 .await
227 .log_err();
228
229 let current_user_and_response = if let Some(response) = response {
230 let user = Arc::new(User {
231 id: user_id,
232 github_login: response.user.github_login.clone().into(),
233 avatar_uri: response.user.avatar_url.clone().into(),
234 name: response.user.name.clone(),
235 });
236
237 Some((user, response))
238 } else {
239 None
240 };
241 current_user_tx
242 .send(
243 current_user_and_response
244 .as_ref()
245 .map(|(user, _)| user.clone()),
246 )
247 .await
248 .ok();
249
250 cx.update(|cx| {
251 if let Some((user, response)) = current_user_and_response {
252 this.update(cx, |this, cx| {
253 this.by_github_login
254 .insert(user.github_login.clone(), user_id);
255 this.users.insert(user_id, user);
256 this.update_authenticated_user(response, cx)
257 })
258 } else {
259 anyhow::Ok(())
260 }
261 })??;
262
263 this.update(cx, |_, cx| cx.notify())?;
264 }
265 }
266 Status::SignedOut => {
267 current_user_tx.send(None).await.ok();
268 this.update(cx, |this, cx| {
269 cx.emit(Event::PrivateUserInfoUpdated);
270 cx.notify();
271 this.clear_contacts()
272 })?
273 .await;
274 }
275 Status::ConnectionLost => {
276 this.update(cx, |this, cx| {
277 cx.notify();
278 this.clear_contacts()
279 })?
280 .await;
281 }
282 _ => {}
283 }
284 }
285 Ok(())
286 }),
287 pending_contact_requests: Default::default(),
288 weak_self: cx.weak_entity(),
289 }
290 }
291
292 #[cfg(feature = "test-support")]
293 pub fn clear_cache(&mut self) {
294 self.users.clear();
295 self.by_github_login.clear();
296 }
297
298 async fn handle_update_invite_info(
299 this: Entity<Self>,
300 message: TypedEnvelope<proto::UpdateInviteInfo>,
301 mut cx: AsyncApp,
302 ) -> Result<()> {
303 this.update(&mut cx, |this, cx| {
304 this.invite_info = Some(InviteInfo {
305 url: Arc::from(message.payload.url),
306 count: message.payload.count,
307 });
308 cx.notify();
309 })?;
310 Ok(())
311 }
312
313 async fn handle_show_contacts(
314 this: Entity<Self>,
315 _: TypedEnvelope<proto::ShowContacts>,
316 mut cx: AsyncApp,
317 ) -> Result<()> {
318 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
319 Ok(())
320 }
321
322 pub fn invite_info(&self) -> Option<&InviteInfo> {
323 self.invite_info.as_ref()
324 }
325
326 async fn handle_update_contacts(
327 this: Entity<Self>,
328 message: TypedEnvelope<proto::UpdateContacts>,
329 cx: AsyncApp,
330 ) -> Result<()> {
331 this.read_with(&cx, |this, _| {
332 this.update_contacts_tx
333 .unbounded_send(UpdateContacts::Update(message.payload))
334 .unwrap();
335 })?;
336 Ok(())
337 }
338
339 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
340 match message {
341 UpdateContacts::Wait(barrier) => {
342 drop(barrier);
343 Task::ready(Ok(()))
344 }
345 UpdateContacts::Clear(barrier) => {
346 self.contacts.clear();
347 self.incoming_contact_requests.clear();
348 self.outgoing_contact_requests.clear();
349 drop(barrier);
350 Task::ready(Ok(()))
351 }
352 UpdateContacts::Update(message) => {
353 let mut user_ids = HashSet::default();
354 for contact in &message.contacts {
355 user_ids.insert(contact.user_id);
356 }
357 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
358 user_ids.extend(message.outgoing_requests.iter());
359
360 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
361 cx.spawn(async move |this, cx| {
362 load_users.await?;
363
364 // Users are fetched in parallel above and cached in call to get_users
365 // No need to parallelize here
366 let mut updated_contacts = Vec::new();
367 let this = this.upgrade().context("can't upgrade user store handle")?;
368 for contact in message.contacts {
369 updated_contacts
370 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
371 }
372
373 let mut incoming_requests = Vec::new();
374 for request in message.incoming_requests {
375 incoming_requests.push({
376 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
377 .await?
378 });
379 }
380
381 let mut outgoing_requests = Vec::new();
382 for requested_user_id in message.outgoing_requests {
383 outgoing_requests.push(
384 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
385 .await?,
386 );
387 }
388
389 let removed_contacts =
390 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
391 let removed_incoming_requests =
392 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
393 let removed_outgoing_requests =
394 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
395
396 this.update(cx, |this, cx| {
397 // Remove contacts
398 this.contacts
399 .retain(|contact| !removed_contacts.contains(&contact.user.id));
400 // Update existing contacts and insert new ones
401 for updated_contact in updated_contacts {
402 match this.contacts.binary_search_by_key(
403 &&updated_contact.user.github_login,
404 |contact| &contact.user.github_login,
405 ) {
406 Ok(ix) => this.contacts[ix] = updated_contact,
407 Err(ix) => this.contacts.insert(ix, updated_contact),
408 }
409 }
410
411 // Remove incoming contact requests
412 this.incoming_contact_requests.retain(|user| {
413 if removed_incoming_requests.contains(&user.id) {
414 cx.emit(Event::Contact {
415 user: user.clone(),
416 kind: ContactEventKind::Cancelled,
417 });
418 false
419 } else {
420 true
421 }
422 });
423 // Update existing incoming requests and insert new ones
424 for user in incoming_requests {
425 match this
426 .incoming_contact_requests
427 .binary_search_by_key(&&user.github_login, |contact| {
428 &contact.github_login
429 }) {
430 Ok(ix) => this.incoming_contact_requests[ix] = user,
431 Err(ix) => this.incoming_contact_requests.insert(ix, user),
432 }
433 }
434
435 // Remove outgoing contact requests
436 this.outgoing_contact_requests
437 .retain(|user| !removed_outgoing_requests.contains(&user.id));
438 // Update existing incoming requests and insert new ones
439 for request in outgoing_requests {
440 match this
441 .outgoing_contact_requests
442 .binary_search_by_key(&&request.github_login, |contact| {
443 &contact.github_login
444 }) {
445 Ok(ix) => this.outgoing_contact_requests[ix] = request,
446 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
447 }
448 }
449
450 cx.notify();
451 })?;
452
453 Ok(())
454 })
455 }
456 }
457 }
458
459 pub fn contacts(&self) -> &[Arc<Contact>] {
460 &self.contacts
461 }
462
463 pub fn has_contact(&self, user: &Arc<User>) -> bool {
464 self.contacts
465 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
466 .is_ok()
467 }
468
469 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
470 &self.incoming_contact_requests
471 }
472
473 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
474 &self.outgoing_contact_requests
475 }
476
477 pub fn is_contact_request_pending(&self, user: &User) -> bool {
478 self.pending_contact_requests.contains_key(&user.id)
479 }
480
481 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
482 if self
483 .contacts
484 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
485 .is_ok()
486 {
487 ContactRequestStatus::RequestAccepted
488 } else if self
489 .outgoing_contact_requests
490 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
491 .is_ok()
492 {
493 ContactRequestStatus::RequestSent
494 } else if self
495 .incoming_contact_requests
496 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
497 .is_ok()
498 {
499 ContactRequestStatus::RequestReceived
500 } else {
501 ContactRequestStatus::None
502 }
503 }
504
505 pub fn request_contact(
506 &mut self,
507 responder_id: u64,
508 cx: &mut Context<Self>,
509 ) -> Task<Result<()>> {
510 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
511 }
512
513 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
514 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
515 }
516
517 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
518 self.incoming_contact_requests
519 .iter()
520 .any(|user| user.id == user_id)
521 }
522
523 pub fn respond_to_contact_request(
524 &mut self,
525 requester_id: u64,
526 accept: bool,
527 cx: &mut Context<Self>,
528 ) -> Task<Result<()>> {
529 self.perform_contact_request(
530 requester_id,
531 proto::RespondToContactRequest {
532 requester_id,
533 response: if accept {
534 proto::ContactRequestResponse::Accept
535 } else {
536 proto::ContactRequestResponse::Decline
537 } as i32,
538 },
539 cx,
540 )
541 }
542
543 pub fn dismiss_contact_request(
544 &self,
545 requester_id: u64,
546 cx: &Context<Self>,
547 ) -> Task<Result<()>> {
548 let client = self.client.upgrade();
549 cx.spawn(async move |_, _| {
550 client
551 .context("can't upgrade client reference")?
552 .request(proto::RespondToContactRequest {
553 requester_id,
554 response: proto::ContactRequestResponse::Dismiss as i32,
555 })
556 .await?;
557 Ok(())
558 })
559 }
560
561 fn perform_contact_request<T: RequestMessage>(
562 &mut self,
563 user_id: u64,
564 request: T,
565 cx: &mut Context<Self>,
566 ) -> Task<Result<()>> {
567 let client = self.client.upgrade();
568 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
569 cx.notify();
570
571 cx.spawn(async move |this, cx| {
572 let response = client
573 .context("can't upgrade client reference")?
574 .request(request)
575 .await;
576 this.update(cx, |this, cx| {
577 if let Entry::Occupied(mut request_count) =
578 this.pending_contact_requests.entry(user_id)
579 {
580 *request_count.get_mut() -= 1;
581 if *request_count.get() == 0 {
582 request_count.remove();
583 }
584 }
585 cx.notify();
586 })?;
587 response?;
588 Ok(())
589 })
590 }
591
592 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
593 let (tx, mut rx) = postage::barrier::channel();
594 self.update_contacts_tx
595 .unbounded_send(UpdateContacts::Clear(tx))
596 .unwrap();
597 async move {
598 rx.next().await;
599 }
600 }
601
602 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
603 let (tx, mut rx) = postage::barrier::channel();
604 self.update_contacts_tx
605 .unbounded_send(UpdateContacts::Wait(tx))
606 .unwrap();
607 async move {
608 rx.next().await;
609 }
610 }
611
612 pub fn get_users(
613 &self,
614 user_ids: Vec<u64>,
615 cx: &Context<Self>,
616 ) -> Task<Result<Vec<Arc<User>>>> {
617 let mut user_ids_to_fetch = user_ids.clone();
618 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
619
620 cx.spawn(async move |this, cx| {
621 if !user_ids_to_fetch.is_empty() {
622 this.update(cx, |this, cx| {
623 this.load_users(
624 proto::GetUsers {
625 user_ids: user_ids_to_fetch,
626 },
627 cx,
628 )
629 })?
630 .await?;
631 }
632
633 this.read_with(cx, |this, _| {
634 user_ids
635 .iter()
636 .map(|user_id| {
637 this.users
638 .get(user_id)
639 .cloned()
640 .with_context(|| format!("user {user_id} not found"))
641 })
642 .collect()
643 })?
644 })
645 }
646
647 pub fn fuzzy_search_users(
648 &self,
649 query: String,
650 cx: &Context<Self>,
651 ) -> Task<Result<Vec<Arc<User>>>> {
652 self.load_users(proto::FuzzySearchUsers { query }, cx)
653 }
654
655 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
656 self.users.get(&user_id).cloned()
657 }
658
659 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
660 if let Some(user) = self.users.get(&user_id).cloned() {
661 return Some(user);
662 }
663
664 self.get_user(user_id, cx).detach_and_log_err(cx);
665 None
666 }
667
668 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
669 if let Some(user) = self.users.get(&user_id).cloned() {
670 return Task::ready(Ok(user));
671 }
672
673 let load_users = self.get_users(vec![user_id], cx);
674 cx.spawn(async move |this, cx| {
675 load_users.await?;
676 this.read_with(cx, |this, _| {
677 this.users
678 .get(&user_id)
679 .cloned()
680 .context("server responded with no users")
681 })?
682 })
683 }
684
685 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
686 self.by_github_login
687 .get(github_login)
688 .and_then(|id| self.users.get(id).cloned())
689 }
690
691 pub fn current_user(&self) -> Option<Arc<User>> {
692 self.current_user.borrow().clone()
693 }
694
695 pub fn plan(&self) -> Option<cloud_llm_client::Plan> {
696 #[cfg(debug_assertions)]
697 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
698 return match plan.as_str() {
699 "free" => Some(cloud_llm_client::Plan::ZedFree),
700 "trial" => Some(cloud_llm_client::Plan::ZedProTrial),
701 "pro" => Some(cloud_llm_client::Plan::ZedPro),
702 _ => {
703 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
704 }
705 };
706 }
707
708 self.plan_info.as_ref().map(|info| info.plan)
709 }
710
711 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
712 self.plan_info
713 .as_ref()
714 .and_then(|plan| plan.subscription_period)
715 .map(|subscription_period| {
716 (
717 subscription_period.started_at.0,
718 subscription_period.ended_at.0,
719 )
720 })
721 }
722
723 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
724 self.plan_info
725 .as_ref()
726 .and_then(|plan| plan.trial_started_at)
727 .map(|trial_started_at| trial_started_at.0)
728 }
729
730 /// Returns whether the user's account is too new to use the service.
731 pub fn account_too_young(&self) -> bool {
732 self.plan_info
733 .as_ref()
734 .map(|plan| plan.is_account_too_young)
735 .unwrap_or_default()
736 }
737
738 /// Returns whether the current user has overdue invoices and usage should be blocked.
739 pub fn has_overdue_invoices(&self) -> bool {
740 self.plan_info
741 .as_ref()
742 .map(|plan| plan.has_overdue_invoices)
743 .unwrap_or_default()
744 }
745
746 pub fn is_usage_based_billing_enabled(&self) -> bool {
747 self.plan_info
748 .as_ref()
749 .map(|plan| plan.is_usage_based_billing_enabled)
750 .unwrap_or_default()
751 }
752
753 pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
754 self.model_request_usage
755 }
756
757 pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
758 self.model_request_usage = Some(usage);
759 cx.notify();
760 }
761
762 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
763 self.edit_prediction_usage
764 }
765
766 pub fn update_edit_prediction_usage(
767 &mut self,
768 usage: EditPredictionUsage,
769 cx: &mut Context<Self>,
770 ) {
771 self.edit_prediction_usage = Some(usage);
772 cx.notify();
773 }
774
775 fn update_authenticated_user(
776 &mut self,
777 response: GetAuthenticatedUserResponse,
778 cx: &mut Context<Self>,
779 ) {
780 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
781 cx.update_flags(staff, response.feature_flags);
782 if let Some(client) = self.client.upgrade() {
783 client
784 .telemetry
785 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
786 }
787
788 self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
789 limit: response.plan.usage.model_requests.limit,
790 amount: response.plan.usage.model_requests.used as i32,
791 }));
792 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
793 limit: response.plan.usage.edit_predictions.limit,
794 amount: response.plan.usage.edit_predictions.used as i32,
795 }));
796 self.plan_info = Some(response.plan);
797 cx.emit(Event::PrivateUserInfoUpdated);
798 }
799
800 fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
801 cx.spawn(async move |cx| {
802 match message {
803 MessageToClient::UserUpdated => {
804 let cloud_client = cx
805 .update(|cx| {
806 this.read_with(cx, |this, _cx| {
807 this.client.upgrade().map(|client| client.cloud_client())
808 })
809 })??
810 .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
811
812 let response = cloud_client.get_authenticated_user().await?;
813 cx.update(|cx| {
814 this.update(cx, |this, cx| {
815 this.update_authenticated_user(response, cx);
816 })
817 })??;
818 }
819 }
820
821 anyhow::Ok(())
822 })
823 .detach_and_log_err(cx);
824 }
825
826 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
827 self.current_user.clone()
828 }
829
830 fn load_users(
831 &self,
832 request: impl RequestMessage<Response = UsersResponse>,
833 cx: &Context<Self>,
834 ) -> Task<Result<Vec<Arc<User>>>> {
835 let client = self.client.clone();
836 cx.spawn(async move |this, cx| {
837 if let Some(rpc) = client.upgrade() {
838 let response = rpc.request(request).await.context("error loading users")?;
839 let users = response.users;
840
841 this.update(cx, |this, _| this.insert(users))
842 } else {
843 Ok(Vec::new())
844 }
845 })
846 }
847
848 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
849 let mut ret = Vec::with_capacity(users.len());
850 for user in users {
851 let user = User::new(user);
852 if let Some(old) = self.users.insert(user.id, user.clone())
853 && old.github_login != user.github_login
854 {
855 self.by_github_login.remove(&old.github_login);
856 }
857 self.by_github_login
858 .insert(user.github_login.clone(), user.id);
859 ret.push(user)
860 }
861 ret
862 }
863
864 pub fn set_participant_indices(
865 &mut self,
866 participant_indices: HashMap<u64, ParticipantIndex>,
867 cx: &mut Context<Self>,
868 ) {
869 if participant_indices != self.participant_indices {
870 self.participant_indices = participant_indices;
871 cx.emit(Event::ParticipantIndicesChanged);
872 }
873 }
874
875 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
876 &self.participant_indices
877 }
878
879 pub fn participant_names(
880 &self,
881 user_ids: impl Iterator<Item = u64>,
882 cx: &App,
883 ) -> HashMap<u64, SharedString> {
884 let mut ret = HashMap::default();
885 let mut missing_user_ids = Vec::new();
886 for id in user_ids {
887 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
888 ret.insert(id, github_login);
889 } else {
890 missing_user_ids.push(id)
891 }
892 }
893 if !missing_user_ids.is_empty() {
894 let this = self.weak_self.clone();
895 cx.spawn(async move |cx| {
896 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
897 .await
898 })
899 .detach_and_log_err(cx);
900 }
901 ret
902 }
903}
904
905impl User {
906 fn new(message: proto::User) -> Arc<Self> {
907 Arc::new(User {
908 id: message.id,
909 github_login: message.github_login.into(),
910 avatar_uri: message.avatar_url.into(),
911 name: message.name,
912 })
913 }
914}
915
916impl Contact {
917 async fn from_proto(
918 contact: proto::Contact,
919 user_store: &Entity<UserStore>,
920 cx: &mut AsyncApp,
921 ) -> Result<Self> {
922 let user = user_store
923 .update(cx, |user_store, cx| {
924 user_store.get_user(contact.user_id, cx)
925 })?
926 .await?;
927 Ok(Self {
928 user,
929 online: contact.online,
930 busy: contact.busy,
931 })
932 }
933}
934
935impl Collaborator {
936 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
937 Ok(Self {
938 peer_id: message.peer_id.context("invalid peer id")?,
939 replica_id: message.replica_id as ReplicaId,
940 user_id: message.user_id as UserId,
941 is_host: message.is_host,
942 committer_name: message.committer_name,
943 committer_email: message.committer_email,
944 })
945 }
946}
947
948impl RequestUsage {
949 pub fn over_limit(&self) -> bool {
950 match self.limit {
951 UsageLimit::Limited(limit) => self.amount >= limit,
952 UsageLimit::Unlimited => false,
953 }
954 }
955
956 fn from_headers(
957 limit_name: &str,
958 amount_name: &str,
959 headers: &HeaderMap<HeaderValue>,
960 ) -> Result<Self> {
961 let limit = headers
962 .get(limit_name)
963 .with_context(|| format!("missing {limit_name:?} header"))?;
964 let limit = UsageLimit::from_str(limit.to_str()?)?;
965
966 let amount = headers
967 .get(amount_name)
968 .with_context(|| format!("missing {amount_name:?} header"))?;
969 let amount = amount.to_str()?.parse::<i32>()?;
970
971 Ok(Self { limit, amount })
972 }
973}
974
975impl ModelRequestUsage {
976 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
977 Ok(Self(RequestUsage::from_headers(
978 MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
979 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
980 headers,
981 )?))
982 }
983}
984
985impl EditPredictionUsage {
986 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
987 Ok(Self(RequestUsage::from_headers(
988 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
989 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
990 headers,
991 )?))
992 }
993}