1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result};
3use chrono::{DateTime, Utc};
4use cloud_api_client::websocket_protocol::MessageToClient;
5use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
6use cloud_llm_client::{
7 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
8 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, Plan,
9 UsageLimit,
10};
11use collections::{HashMap, HashSet, hash_map::Entry};
12use derive_more::Deref;
13use feature_flags::FeatureFlagAppExt;
14use futures::{Future, StreamExt, channel::mpsc};
15use gpui::{
16 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
17};
18use http_client::http::{HeaderMap, HeaderValue};
19use postage::{sink::Sink, watch};
20use rpc::proto::{RequestMessage, UsersResponse};
21use std::{
22 str::FromStr as _,
23 sync::{Arc, Weak},
24};
25use text::ReplicaId;
26use util::{ResultExt, TryFutureExt as _};
27
28pub type UserId = u64;
29
30#[derive(
31 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
32)]
33pub struct ChannelId(pub u64);
34
35impl std::fmt::Display for ChannelId {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 self.0.fmt(f)
38 }
39}
40
41#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
42pub struct ProjectId(pub u64);
43
44impl ProjectId {
45 pub fn to_proto(self) -> u64 {
46 self.0
47 }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub struct ParticipantIndex(pub u32);
52
53#[derive(Default, Debug)]
54pub struct User {
55 pub id: UserId,
56 pub github_login: SharedString,
57 pub avatar_uri: SharedUri,
58 pub name: Option<String>,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq)]
62pub struct Collaborator {
63 pub peer_id: proto::PeerId,
64 pub replica_id: ReplicaId,
65 pub user_id: UserId,
66 pub is_host: bool,
67 pub committer_name: Option<String>,
68 pub committer_email: Option<String>,
69}
70
71impl PartialOrd for User {
72 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
73 Some(self.cmp(other))
74 }
75}
76
77impl Ord for User {
78 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
79 self.github_login.cmp(&other.github_login)
80 }
81}
82
83impl PartialEq for User {
84 fn eq(&self, other: &Self) -> bool {
85 self.id == other.id && self.github_login == other.github_login
86 }
87}
88
89impl Eq for User {}
90
91#[derive(Debug, PartialEq)]
92pub struct Contact {
93 pub user: Arc<User>,
94 pub online: bool,
95 pub busy: bool,
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub enum ContactRequestStatus {
100 None,
101 RequestSent,
102 RequestReceived,
103 RequestAccepted,
104}
105
106pub struct UserStore {
107 users: HashMap<u64, Arc<User>>,
108 by_github_login: HashMap<SharedString, u64>,
109 participant_indices: HashMap<u64, ParticipantIndex>,
110 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
111 model_request_usage: Option<ModelRequestUsage>,
112 edit_prediction_usage: Option<EditPredictionUsage>,
113 plan_info: Option<PlanInfo>,
114 current_user: watch::Receiver<Option<Arc<User>>>,
115 contacts: Vec<Arc<Contact>>,
116 incoming_contact_requests: Vec<Arc<User>>,
117 outgoing_contact_requests: Vec<Arc<User>>,
118 pending_contact_requests: HashMap<u64, usize>,
119 client: Weak<Client>,
120 _maintain_contacts: Task<()>,
121 _maintain_current_user: Task<Result<()>>,
122 weak_self: WeakEntity<Self>,
123}
124
125#[derive(Clone)]
126pub struct InviteInfo {
127 pub count: u32,
128 pub url: Arc<str>,
129}
130
131pub enum Event {
132 Contact {
133 user: Arc<User>,
134 kind: ContactEventKind,
135 },
136 ShowContacts,
137 ParticipantIndicesChanged,
138 PrivateUserInfoUpdated,
139 PlanUpdated,
140}
141
142#[derive(Clone, Copy)]
143pub enum ContactEventKind {
144 Requested,
145 Accepted,
146 Cancelled,
147}
148
149impl EventEmitter<Event> for UserStore {}
150
151enum UpdateContacts {
152 Update(proto::UpdateContacts),
153 Wait(postage::barrier::Sender),
154 Clear(postage::barrier::Sender),
155}
156
157#[derive(Debug, Clone, Copy, Deref)]
158pub struct ModelRequestUsage(pub RequestUsage);
159
160#[derive(Debug, Clone, Copy, Deref)]
161pub struct EditPredictionUsage(pub RequestUsage);
162
163#[derive(Debug, Clone, Copy)]
164pub struct RequestUsage {
165 pub limit: UsageLimit,
166 pub amount: i32,
167}
168
169impl UserStore {
170 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
171 let (mut current_user_tx, current_user_rx) = watch::channel();
172 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
173 let rpc_subscriptions = vec![
174 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
175 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
176 ];
177
178 client.add_message_to_client_handler({
179 let this = cx.weak_entity();
180 move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
181 });
182
183 Self {
184 users: Default::default(),
185 by_github_login: Default::default(),
186 current_user: current_user_rx,
187 plan_info: None,
188 model_request_usage: None,
189 edit_prediction_usage: None,
190 contacts: Default::default(),
191 incoming_contact_requests: Default::default(),
192 participant_indices: Default::default(),
193 outgoing_contact_requests: Default::default(),
194 client: Arc::downgrade(&client),
195 update_contacts_tx,
196 _maintain_contacts: cx.spawn(async move |this, cx| {
197 let _subscriptions = rpc_subscriptions;
198 while let Some(message) = update_contacts_rx.next().await {
199 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
200 {
201 task.log_err().await;
202 } else {
203 break;
204 }
205 }
206 }),
207 _maintain_current_user: cx.spawn(async move |this, cx| {
208 let mut status = client.status();
209 let weak = Arc::downgrade(&client);
210 drop(client);
211 while let Some(status) = status.next().await {
212 // if the client is dropped, the app is shutting down.
213 let Some(client) = weak.upgrade() else {
214 return Ok(());
215 };
216 match status {
217 Status::Authenticated
218 | Status::Reauthenticated
219 | Status::Connected { .. } => {
220 if let Some(user_id) = client.user_id() {
221 let response = client
222 .cloud_client()
223 .get_authenticated_user()
224 .await
225 .log_err();
226
227 let current_user_and_response = if let Some(response) = response {
228 let user = Arc::new(User {
229 id: user_id,
230 github_login: response.user.github_login.clone().into(),
231 avatar_uri: response.user.avatar_url.clone().into(),
232 name: response.user.name.clone(),
233 });
234
235 Some((user, response))
236 } else {
237 None
238 };
239 current_user_tx
240 .send(
241 current_user_and_response
242 .as_ref()
243 .map(|(user, _)| user.clone()),
244 )
245 .await
246 .ok();
247
248 cx.update(|cx| {
249 if let Some((user, response)) = current_user_and_response {
250 this.update(cx, |this, cx| {
251 this.by_github_login
252 .insert(user.github_login.clone(), user_id);
253 this.users.insert(user_id, user);
254 this.update_authenticated_user(response, cx)
255 })
256 } else {
257 anyhow::Ok(())
258 }
259 })?;
260
261 this.update(cx, |_, cx| cx.notify())?;
262 }
263 }
264 Status::SignedOut => {
265 current_user_tx.send(None).await.ok();
266 this.update(cx, |this, cx| {
267 this.clear_plan_and_usage();
268 cx.emit(Event::PrivateUserInfoUpdated);
269 cx.notify();
270 this.clear_contacts()
271 })?
272 .await;
273 }
274 Status::ConnectionLost => {
275 this.update(cx, |this, cx| {
276 cx.notify();
277 this.clear_contacts()
278 })?
279 .await;
280 }
281 _ => {}
282 }
283 }
284 Ok(())
285 }),
286 pending_contact_requests: Default::default(),
287 weak_self: cx.weak_entity(),
288 }
289 }
290
291 #[cfg(feature = "test-support")]
292 pub fn clear_cache(&mut self) {
293 self.users.clear();
294 self.by_github_login.clear();
295 }
296
297 async fn handle_show_contacts(
298 this: Entity<Self>,
299 _: TypedEnvelope<proto::ShowContacts>,
300 mut cx: AsyncApp,
301 ) -> Result<()> {
302 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
303 Ok(())
304 }
305
306 async fn handle_update_contacts(
307 this: Entity<Self>,
308 message: TypedEnvelope<proto::UpdateContacts>,
309 cx: AsyncApp,
310 ) -> Result<()> {
311 this.read_with(&cx, |this, _| {
312 this.update_contacts_tx
313 .unbounded_send(UpdateContacts::Update(message.payload))
314 .unwrap();
315 });
316 Ok(())
317 }
318
319 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
320 match message {
321 UpdateContacts::Wait(barrier) => {
322 drop(barrier);
323 Task::ready(Ok(()))
324 }
325 UpdateContacts::Clear(barrier) => {
326 self.contacts.clear();
327 self.incoming_contact_requests.clear();
328 self.outgoing_contact_requests.clear();
329 drop(barrier);
330 Task::ready(Ok(()))
331 }
332 UpdateContacts::Update(message) => {
333 let mut user_ids = HashSet::default();
334 for contact in &message.contacts {
335 user_ids.insert(contact.user_id);
336 }
337 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
338 user_ids.extend(message.outgoing_requests.iter());
339
340 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
341 cx.spawn(async move |this, cx| {
342 load_users.await?;
343
344 // Users are fetched in parallel above and cached in call to get_users
345 // No need to parallelize here
346 let mut updated_contacts = Vec::new();
347 let this = this.upgrade().context("can't upgrade user store handle")?;
348 for contact in message.contacts {
349 updated_contacts
350 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
351 }
352
353 let mut incoming_requests = Vec::new();
354 for request in message.incoming_requests {
355 incoming_requests.push({
356 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
357 .await?
358 });
359 }
360
361 let mut outgoing_requests = Vec::new();
362 for requested_user_id in message.outgoing_requests {
363 outgoing_requests.push(
364 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
365 .await?,
366 );
367 }
368
369 let removed_contacts =
370 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
371 let removed_incoming_requests =
372 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
373 let removed_outgoing_requests =
374 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
375
376 this.update(cx, |this, cx| {
377 // Remove contacts
378 this.contacts
379 .retain(|contact| !removed_contacts.contains(&contact.user.id));
380 // Update existing contacts and insert new ones
381 for updated_contact in updated_contacts {
382 match this.contacts.binary_search_by_key(
383 &&updated_contact.user.github_login,
384 |contact| &contact.user.github_login,
385 ) {
386 Ok(ix) => this.contacts[ix] = updated_contact,
387 Err(ix) => this.contacts.insert(ix, updated_contact),
388 }
389 }
390
391 // Remove incoming contact requests
392 this.incoming_contact_requests.retain(|user| {
393 if removed_incoming_requests.contains(&user.id) {
394 cx.emit(Event::Contact {
395 user: user.clone(),
396 kind: ContactEventKind::Cancelled,
397 });
398 false
399 } else {
400 true
401 }
402 });
403 // Update existing incoming requests and insert new ones
404 for user in incoming_requests {
405 match this
406 .incoming_contact_requests
407 .binary_search_by_key(&&user.github_login, |contact| {
408 &contact.github_login
409 }) {
410 Ok(ix) => this.incoming_contact_requests[ix] = user,
411 Err(ix) => this.incoming_contact_requests.insert(ix, user),
412 }
413 }
414
415 // Remove outgoing contact requests
416 this.outgoing_contact_requests
417 .retain(|user| !removed_outgoing_requests.contains(&user.id));
418 // Update existing incoming requests and insert new ones
419 for request in outgoing_requests {
420 match this
421 .outgoing_contact_requests
422 .binary_search_by_key(&&request.github_login, |contact| {
423 &contact.github_login
424 }) {
425 Ok(ix) => this.outgoing_contact_requests[ix] = request,
426 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
427 }
428 }
429
430 cx.notify();
431 });
432
433 Ok(())
434 })
435 }
436 }
437 }
438
439 pub fn contacts(&self) -> &[Arc<Contact>] {
440 &self.contacts
441 }
442
443 pub fn has_contact(&self, user: &Arc<User>) -> bool {
444 self.contacts
445 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
446 .is_ok()
447 }
448
449 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
450 &self.incoming_contact_requests
451 }
452
453 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
454 &self.outgoing_contact_requests
455 }
456
457 pub fn is_contact_request_pending(&self, user: &User) -> bool {
458 self.pending_contact_requests.contains_key(&user.id)
459 }
460
461 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
462 if self
463 .contacts
464 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
465 .is_ok()
466 {
467 ContactRequestStatus::RequestAccepted
468 } else if self
469 .outgoing_contact_requests
470 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
471 .is_ok()
472 {
473 ContactRequestStatus::RequestSent
474 } else if self
475 .incoming_contact_requests
476 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
477 .is_ok()
478 {
479 ContactRequestStatus::RequestReceived
480 } else {
481 ContactRequestStatus::None
482 }
483 }
484
485 pub fn request_contact(
486 &mut self,
487 responder_id: u64,
488 cx: &mut Context<Self>,
489 ) -> Task<Result<()>> {
490 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
491 }
492
493 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
494 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
495 }
496
497 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
498 self.incoming_contact_requests
499 .iter()
500 .any(|user| user.id == user_id)
501 }
502
503 pub fn respond_to_contact_request(
504 &mut self,
505 requester_id: u64,
506 accept: bool,
507 cx: &mut Context<Self>,
508 ) -> Task<Result<()>> {
509 self.perform_contact_request(
510 requester_id,
511 proto::RespondToContactRequest {
512 requester_id,
513 response: if accept {
514 proto::ContactRequestResponse::Accept
515 } else {
516 proto::ContactRequestResponse::Decline
517 } as i32,
518 },
519 cx,
520 )
521 }
522
523 pub fn dismiss_contact_request(
524 &self,
525 requester_id: u64,
526 cx: &Context<Self>,
527 ) -> Task<Result<()>> {
528 let client = self.client.upgrade();
529 cx.spawn(async move |_, _| {
530 client
531 .context("can't upgrade client reference")?
532 .request(proto::RespondToContactRequest {
533 requester_id,
534 response: proto::ContactRequestResponse::Dismiss as i32,
535 })
536 .await?;
537 Ok(())
538 })
539 }
540
541 fn perform_contact_request<T: RequestMessage>(
542 &mut self,
543 user_id: u64,
544 request: T,
545 cx: &mut Context<Self>,
546 ) -> Task<Result<()>> {
547 let client = self.client.upgrade();
548 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
549 cx.notify();
550
551 cx.spawn(async move |this, cx| {
552 let response = client
553 .context("can't upgrade client reference")?
554 .request(request)
555 .await;
556 this.update(cx, |this, cx| {
557 if let Entry::Occupied(mut request_count) =
558 this.pending_contact_requests.entry(user_id)
559 {
560 *request_count.get_mut() -= 1;
561 if *request_count.get() == 0 {
562 request_count.remove();
563 }
564 }
565 cx.notify();
566 })?;
567 response?;
568 Ok(())
569 })
570 }
571
572 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
573 let (tx, mut rx) = postage::barrier::channel();
574 self.update_contacts_tx
575 .unbounded_send(UpdateContacts::Clear(tx))
576 .unwrap();
577 async move {
578 rx.next().await;
579 }
580 }
581
582 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
583 let (tx, mut rx) = postage::barrier::channel();
584 self.update_contacts_tx
585 .unbounded_send(UpdateContacts::Wait(tx))
586 .unwrap();
587 async move {
588 rx.next().await;
589 }
590 }
591
592 pub fn get_users(
593 &self,
594 user_ids: Vec<u64>,
595 cx: &Context<Self>,
596 ) -> Task<Result<Vec<Arc<User>>>> {
597 let mut user_ids_to_fetch = user_ids.clone();
598 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
599
600 cx.spawn(async move |this, cx| {
601 if !user_ids_to_fetch.is_empty() {
602 this.update(cx, |this, cx| {
603 this.load_users(
604 proto::GetUsers {
605 user_ids: user_ids_to_fetch,
606 },
607 cx,
608 )
609 })?
610 .await?;
611 }
612
613 this.read_with(cx, |this, _| {
614 user_ids
615 .iter()
616 .map(|user_id| {
617 this.users
618 .get(user_id)
619 .cloned()
620 .with_context(|| format!("user {user_id} not found"))
621 })
622 .collect()
623 })?
624 })
625 }
626
627 pub fn fuzzy_search_users(
628 &self,
629 query: String,
630 cx: &Context<Self>,
631 ) -> Task<Result<Vec<Arc<User>>>> {
632 self.load_users(proto::FuzzySearchUsers { query }, cx)
633 }
634
635 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
636 self.users.get(&user_id).cloned()
637 }
638
639 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
640 if let Some(user) = self.users.get(&user_id).cloned() {
641 return Some(user);
642 }
643
644 self.get_user(user_id, cx).detach_and_log_err(cx);
645 None
646 }
647
648 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
649 if let Some(user) = self.users.get(&user_id).cloned() {
650 return Task::ready(Ok(user));
651 }
652
653 let load_users = self.get_users(vec![user_id], cx);
654 cx.spawn(async move |this, cx| {
655 load_users.await?;
656 this.read_with(cx, |this, _| {
657 this.users
658 .get(&user_id)
659 .cloned()
660 .context("server responded with no users")
661 })?
662 })
663 }
664
665 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
666 self.by_github_login
667 .get(github_login)
668 .and_then(|id| self.users.get(id).cloned())
669 }
670
671 pub fn current_user(&self) -> Option<Arc<User>> {
672 self.current_user.borrow().clone()
673 }
674
675 pub fn plan(&self) -> Option<Plan> {
676 #[cfg(debug_assertions)]
677 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
678 use cloud_llm_client::PlanV1;
679
680 return match plan.as_str() {
681 "free" => Some(Plan::V1(PlanV1::ZedFree)),
682 "trial" => Some(Plan::V1(PlanV1::ZedProTrial)),
683 "pro" => Some(Plan::V1(PlanV1::ZedPro)),
684 _ => {
685 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
686 }
687 };
688 }
689
690 self.plan_info.as_ref().map(|info| info.plan())
691 }
692
693 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
694 self.plan_info
695 .as_ref()
696 .and_then(|plan| plan.subscription_period)
697 .map(|subscription_period| {
698 (
699 subscription_period.started_at.0,
700 subscription_period.ended_at.0,
701 )
702 })
703 }
704
705 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
706 self.plan_info
707 .as_ref()
708 .and_then(|plan| plan.trial_started_at)
709 .map(|trial_started_at| trial_started_at.0)
710 }
711
712 /// Returns whether the user's account is too new to use the service.
713 pub fn account_too_young(&self) -> bool {
714 self.plan_info
715 .as_ref()
716 .map(|plan| plan.is_account_too_young)
717 .unwrap_or_default()
718 }
719
720 /// Returns whether the current user has overdue invoices and usage should be blocked.
721 pub fn has_overdue_invoices(&self) -> bool {
722 self.plan_info
723 .as_ref()
724 .map(|plan| plan.has_overdue_invoices)
725 .unwrap_or_default()
726 }
727
728 pub fn is_usage_based_billing_enabled(&self) -> bool {
729 self.plan_info
730 .as_ref()
731 .map(|plan| plan.is_usage_based_billing_enabled)
732 .unwrap_or_default()
733 }
734
735 pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
736 if self.plan().is_some_and(|plan| plan.is_v2()) {
737 return None;
738 }
739
740 self.model_request_usage
741 }
742
743 pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
744 self.model_request_usage = Some(usage);
745 cx.notify();
746 }
747
748 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
749 self.edit_prediction_usage
750 }
751
752 pub fn update_edit_prediction_usage(
753 &mut self,
754 usage: EditPredictionUsage,
755 cx: &mut Context<Self>,
756 ) {
757 self.edit_prediction_usage = Some(usage);
758 cx.notify();
759 }
760
761 pub fn clear_plan_and_usage(&mut self) {
762 self.plan_info = None;
763 self.model_request_usage = None;
764 self.edit_prediction_usage = None;
765 }
766
767 fn update_authenticated_user(
768 &mut self,
769 response: GetAuthenticatedUserResponse,
770 cx: &mut Context<Self>,
771 ) {
772 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
773 cx.update_flags(staff, response.feature_flags);
774 if let Some(client) = self.client.upgrade() {
775 client
776 .telemetry
777 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
778 }
779
780 self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
781 limit: response.plan.usage.model_requests.limit,
782 amount: response.plan.usage.model_requests.used as i32,
783 }));
784 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
785 limit: response.plan.usage.edit_predictions.limit,
786 amount: response.plan.usage.edit_predictions.used as i32,
787 }));
788 self.plan_info = Some(response.plan);
789 cx.emit(Event::PrivateUserInfoUpdated);
790 }
791
792 fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
793 cx.spawn(async move |cx| {
794 match message {
795 MessageToClient::UserUpdated => {
796 let cloud_client = cx
797 .update(|cx| {
798 this.read_with(cx, |this, _cx| {
799 this.client.upgrade().map(|client| client.cloud_client())
800 })
801 })?
802 .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
803
804 let response = cloud_client.get_authenticated_user().await?;
805 cx.update(|cx| {
806 this.update(cx, |this, cx| {
807 this.update_authenticated_user(response, cx);
808 })
809 })?;
810 }
811 }
812
813 anyhow::Ok(())
814 })
815 .detach_and_log_err(cx);
816 }
817
818 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
819 self.current_user.clone()
820 }
821
822 fn load_users(
823 &self,
824 request: impl RequestMessage<Response = UsersResponse>,
825 cx: &Context<Self>,
826 ) -> Task<Result<Vec<Arc<User>>>> {
827 let client = self.client.clone();
828 cx.spawn(async move |this, cx| {
829 if let Some(rpc) = client.upgrade() {
830 let response = rpc.request(request).await.context("error loading users")?;
831 let users = response.users;
832
833 this.update(cx, |this, _| this.insert(users))
834 } else {
835 Ok(Vec::new())
836 }
837 })
838 }
839
840 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
841 let mut ret = Vec::with_capacity(users.len());
842 for user in users {
843 let user = User::new(user);
844 if let Some(old) = self.users.insert(user.id, user.clone())
845 && old.github_login != user.github_login
846 {
847 self.by_github_login.remove(&old.github_login);
848 }
849 self.by_github_login
850 .insert(user.github_login.clone(), user.id);
851 ret.push(user)
852 }
853 ret
854 }
855
856 pub fn set_participant_indices(
857 &mut self,
858 participant_indices: HashMap<u64, ParticipantIndex>,
859 cx: &mut Context<Self>,
860 ) {
861 if participant_indices != self.participant_indices {
862 self.participant_indices = participant_indices;
863 cx.emit(Event::ParticipantIndicesChanged);
864 }
865 }
866
867 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
868 &self.participant_indices
869 }
870
871 pub fn participant_names(
872 &self,
873 user_ids: impl Iterator<Item = u64>,
874 cx: &App,
875 ) -> HashMap<u64, SharedString> {
876 let mut ret = HashMap::default();
877 let mut missing_user_ids = Vec::new();
878 for id in user_ids {
879 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
880 ret.insert(id, github_login);
881 } else {
882 missing_user_ids.push(id)
883 }
884 }
885 if !missing_user_ids.is_empty() {
886 let this = self.weak_self.clone();
887 cx.spawn(async move |cx| {
888 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
889 .await
890 })
891 .detach_and_log_err(cx);
892 }
893 ret
894 }
895}
896
897impl User {
898 fn new(message: proto::User) -> Arc<Self> {
899 Arc::new(User {
900 id: message.id,
901 github_login: message.github_login.into(),
902 avatar_uri: message.avatar_url.into(),
903 name: message.name,
904 })
905 }
906}
907
908impl Contact {
909 async fn from_proto(
910 contact: proto::Contact,
911 user_store: &Entity<UserStore>,
912 cx: &mut AsyncApp,
913 ) -> Result<Self> {
914 let user = user_store
915 .update(cx, |user_store, cx| {
916 user_store.get_user(contact.user_id, cx)
917 })
918 .await?;
919 Ok(Self {
920 user,
921 online: contact.online,
922 busy: contact.busy,
923 })
924 }
925}
926
927impl Collaborator {
928 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
929 Ok(Self {
930 peer_id: message.peer_id.context("invalid peer id")?,
931 replica_id: ReplicaId::new(message.replica_id as u16),
932 user_id: message.user_id as UserId,
933 is_host: message.is_host,
934 committer_name: message.committer_name,
935 committer_email: message.committer_email,
936 })
937 }
938}
939
940impl RequestUsage {
941 pub fn over_limit(&self) -> bool {
942 match self.limit {
943 UsageLimit::Limited(limit) => self.amount >= limit,
944 UsageLimit::Unlimited => false,
945 }
946 }
947
948 fn from_headers(
949 limit_name: &str,
950 amount_name: &str,
951 headers: &HeaderMap<HeaderValue>,
952 ) -> Result<Self> {
953 let limit = headers
954 .get(limit_name)
955 .with_context(|| format!("missing {limit_name:?} header"))?;
956 let limit = UsageLimit::from_str(limit.to_str()?)?;
957
958 let amount = headers
959 .get(amount_name)
960 .with_context(|| format!("missing {amount_name:?} header"))?;
961 let amount = amount.to_str()?.parse::<i32>()?;
962
963 Ok(Self { limit, amount })
964 }
965}
966
967impl ModelRequestUsage {
968 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
969 Ok(Self(RequestUsage::from_headers(
970 MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
971 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
972 headers,
973 )?))
974 }
975}
976
977impl EditPredictionUsage {
978 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
979 Ok(Self(RequestUsage::from_headers(
980 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
981 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
982 headers,
983 )?))
984 }
985}