1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use cloud_api_client::websocket_protocol::MessageToClient;
5use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
6use cloud_llm_client::{
7 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
8 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
9};
10use collections::{HashMap, HashSet, hash_map::Entry};
11use derive_more::Deref;
12use feature_flags::FeatureFlagAppExt;
13use futures::{Future, StreamExt, channel::mpsc};
14use gpui::{
15 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
16};
17use http_client::http::{HeaderMap, HeaderValue};
18use postage::{sink::Sink, watch};
19use rpc::proto::{RequestMessage, UsersResponse};
20use std::{
21 str::FromStr as _,
22 sync::{Arc, Weak},
23};
24use text::ReplicaId;
25use util::{ResultExt, TryFutureExt as _};
26
27pub type UserId = u64;
28
29#[derive(
30 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
31)]
32pub struct ChannelId(pub u64);
33
34impl std::fmt::Display for ChannelId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 self.0.fmt(f)
37 }
38}
39
40#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
41pub struct ProjectId(pub u64);
42
43impl ProjectId {
44 pub fn to_proto(&self) -> u64 {
45 self.0
46 }
47}
48
49#[derive(
50 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
51)]
52pub struct DevServerProjectId(pub u64);
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub struct ParticipantIndex(pub u32);
56
57#[derive(Default, Debug)]
58pub struct User {
59 pub id: UserId,
60 pub github_login: SharedString,
61 pub avatar_uri: SharedUri,
62 pub name: Option<String>,
63}
64
65#[derive(Clone, Debug, PartialEq, Eq)]
66pub struct Collaborator {
67 pub peer_id: proto::PeerId,
68 pub replica_id: ReplicaId,
69 pub user_id: UserId,
70 pub is_host: bool,
71 pub committer_name: Option<String>,
72 pub committer_email: Option<String>,
73}
74
75impl PartialOrd for User {
76 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
77 Some(self.cmp(other))
78 }
79}
80
81impl Ord for User {
82 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
83 self.github_login.cmp(&other.github_login)
84 }
85}
86
87impl PartialEq for User {
88 fn eq(&self, other: &Self) -> bool {
89 self.id == other.id && self.github_login == other.github_login
90 }
91}
92
93impl Eq for User {}
94
95#[derive(Debug, PartialEq)]
96pub struct Contact {
97 pub user: Arc<User>,
98 pub online: bool,
99 pub busy: bool,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum ContactRequestStatus {
104 None,
105 RequestSent,
106 RequestReceived,
107 RequestAccepted,
108}
109
110pub struct UserStore {
111 users: HashMap<u64, Arc<User>>,
112 by_github_login: HashMap<SharedString, u64>,
113 participant_indices: HashMap<u64, ParticipantIndex>,
114 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
115 model_request_usage: Option<ModelRequestUsage>,
116 edit_prediction_usage: Option<EditPredictionUsage>,
117 plan_info: Option<PlanInfo>,
118 current_user: watch::Receiver<Option<Arc<User>>>,
119 accepted_tos_at: Option<Option<cloud_api_client::Timestamp>>,
120 contacts: Vec<Arc<Contact>>,
121 incoming_contact_requests: Vec<Arc<User>>,
122 outgoing_contact_requests: Vec<Arc<User>>,
123 pending_contact_requests: HashMap<u64, usize>,
124 invite_info: Option<InviteInfo>,
125 client: Weak<Client>,
126 _maintain_contacts: Task<()>,
127 _maintain_current_user: Task<Result<()>>,
128 weak_self: WeakEntity<Self>,
129}
130
131#[derive(Clone)]
132pub struct InviteInfo {
133 pub count: u32,
134 pub url: Arc<str>,
135}
136
137pub enum Event {
138 Contact {
139 user: Arc<User>,
140 kind: ContactEventKind,
141 },
142 ShowContacts,
143 ParticipantIndicesChanged,
144 PrivateUserInfoUpdated,
145 PlanUpdated,
146}
147
148#[derive(Clone, Copy)]
149pub enum ContactEventKind {
150 Requested,
151 Accepted,
152 Cancelled,
153}
154
155impl EventEmitter<Event> for UserStore {}
156
157enum UpdateContacts {
158 Update(proto::UpdateContacts),
159 Wait(postage::barrier::Sender),
160 Clear(postage::barrier::Sender),
161}
162
163#[derive(Debug, Clone, Copy, Deref)]
164pub struct ModelRequestUsage(pub RequestUsage);
165
166#[derive(Debug, Clone, Copy, Deref)]
167pub struct EditPredictionUsage(pub RequestUsage);
168
169#[derive(Debug, Clone, Copy)]
170pub struct RequestUsage {
171 pub limit: UsageLimit,
172 pub amount: i32,
173}
174
175impl UserStore {
176 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
177 let (mut current_user_tx, current_user_rx) = watch::channel();
178 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
179 let rpc_subscriptions = vec![
180 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
181 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
182 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
183 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
184 ];
185
186 client.add_message_to_client_handler({
187 let this = cx.weak_entity();
188 move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
189 });
190
191 Self {
192 users: Default::default(),
193 by_github_login: Default::default(),
194 current_user: current_user_rx,
195 plan_info: None,
196 model_request_usage: None,
197 edit_prediction_usage: None,
198 accepted_tos_at: None,
199 contacts: Default::default(),
200 incoming_contact_requests: Default::default(),
201 participant_indices: Default::default(),
202 outgoing_contact_requests: Default::default(),
203 invite_info: None,
204 client: Arc::downgrade(&client),
205 update_contacts_tx,
206 _maintain_contacts: cx.spawn(async move |this, cx| {
207 let _subscriptions = rpc_subscriptions;
208 while let Some(message) = update_contacts_rx.next().await {
209 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
210 {
211 task.log_err().await;
212 } else {
213 break;
214 }
215 }
216 }),
217 _maintain_current_user: cx.spawn(async move |this, cx| {
218 let mut status = client.status();
219 let weak = Arc::downgrade(&client);
220 drop(client);
221 while let Some(status) = status.next().await {
222 // if the client is dropped, the app is shutting down.
223 let Some(client) = weak.upgrade() else {
224 return Ok(());
225 };
226 match status {
227 Status::Authenticated | Status::Connected { .. } => {
228 if let Some(user_id) = client.user_id() {
229 let response = client.cloud_client().get_authenticated_user().await;
230 let mut current_user = None;
231 cx.update(|cx| {
232 if let Some(response) = response.log_err() {
233 let user = Arc::new(User {
234 id: user_id,
235 github_login: response.user.github_login.clone().into(),
236 avatar_uri: response.user.avatar_url.clone().into(),
237 name: response.user.name.clone(),
238 });
239 current_user = Some(user.clone());
240 this.update(cx, |this, cx| {
241 this.by_github_login
242 .insert(user.github_login.clone(), user_id);
243 this.users.insert(user_id, user);
244 this.update_authenticated_user(response, cx)
245 })
246 } else {
247 anyhow::Ok(())
248 }
249 })??;
250 current_user_tx.send(current_user).await.ok();
251
252 this.update(cx, |_, cx| cx.notify())?;
253 }
254 }
255 Status::SignedOut => {
256 current_user_tx.send(None).await.ok();
257 this.update(cx, |this, cx| {
258 this.accepted_tos_at = None;
259 cx.emit(Event::PrivateUserInfoUpdated);
260 cx.notify();
261 this.clear_contacts()
262 })?
263 .await;
264 }
265 Status::ConnectionLost => {
266 this.update(cx, |this, cx| {
267 cx.notify();
268 this.clear_contacts()
269 })?
270 .await;
271 }
272 _ => {}
273 }
274 }
275 Ok(())
276 }),
277 pending_contact_requests: Default::default(),
278 weak_self: cx.weak_entity(),
279 }
280 }
281
282 #[cfg(feature = "test-support")]
283 pub fn clear_cache(&mut self) {
284 self.users.clear();
285 self.by_github_login.clear();
286 }
287
288 async fn handle_update_invite_info(
289 this: Entity<Self>,
290 message: TypedEnvelope<proto::UpdateInviteInfo>,
291 mut cx: AsyncApp,
292 ) -> Result<()> {
293 this.update(&mut cx, |this, cx| {
294 this.invite_info = Some(InviteInfo {
295 url: Arc::from(message.payload.url),
296 count: message.payload.count,
297 });
298 cx.notify();
299 })?;
300 Ok(())
301 }
302
303 async fn handle_show_contacts(
304 this: Entity<Self>,
305 _: TypedEnvelope<proto::ShowContacts>,
306 mut cx: AsyncApp,
307 ) -> Result<()> {
308 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
309 Ok(())
310 }
311
312 pub fn invite_info(&self) -> Option<&InviteInfo> {
313 self.invite_info.as_ref()
314 }
315
316 async fn handle_update_contacts(
317 this: Entity<Self>,
318 message: TypedEnvelope<proto::UpdateContacts>,
319 mut cx: AsyncApp,
320 ) -> Result<()> {
321 this.read_with(&mut cx, |this, _| {
322 this.update_contacts_tx
323 .unbounded_send(UpdateContacts::Update(message.payload))
324 .unwrap();
325 })?;
326 Ok(())
327 }
328
329 async fn handle_update_plan(
330 this: Entity<Self>,
331 _message: TypedEnvelope<proto::UpdateUserPlan>,
332 mut cx: AsyncApp,
333 ) -> Result<()> {
334 let client = this
335 .read_with(&cx, |this, _| this.client.upgrade())?
336 .context("client was dropped")?;
337
338 let response = client
339 .cloud_client()
340 .get_authenticated_user()
341 .await
342 .context("failed to fetch authenticated user")?;
343
344 this.update(&mut cx, |this, cx| {
345 this.update_authenticated_user(response, cx);
346 })
347 }
348
349 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
350 match message {
351 UpdateContacts::Wait(barrier) => {
352 drop(barrier);
353 Task::ready(Ok(()))
354 }
355 UpdateContacts::Clear(barrier) => {
356 self.contacts.clear();
357 self.incoming_contact_requests.clear();
358 self.outgoing_contact_requests.clear();
359 drop(barrier);
360 Task::ready(Ok(()))
361 }
362 UpdateContacts::Update(message) => {
363 let mut user_ids = HashSet::default();
364 for contact in &message.contacts {
365 user_ids.insert(contact.user_id);
366 }
367 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
368 user_ids.extend(message.outgoing_requests.iter());
369
370 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
371 cx.spawn(async move |this, cx| {
372 load_users.await?;
373
374 // Users are fetched in parallel above and cached in call to get_users
375 // No need to parallelize here
376 let mut updated_contacts = Vec::new();
377 let this = this.upgrade().context("can't upgrade user store handle")?;
378 for contact in message.contacts {
379 updated_contacts
380 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
381 }
382
383 let mut incoming_requests = Vec::new();
384 for request in message.incoming_requests {
385 incoming_requests.push({
386 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
387 .await?
388 });
389 }
390
391 let mut outgoing_requests = Vec::new();
392 for requested_user_id in message.outgoing_requests {
393 outgoing_requests.push(
394 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
395 .await?,
396 );
397 }
398
399 let removed_contacts =
400 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
401 let removed_incoming_requests =
402 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
403 let removed_outgoing_requests =
404 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
405
406 this.update(cx, |this, cx| {
407 // Remove contacts
408 this.contacts
409 .retain(|contact| !removed_contacts.contains(&contact.user.id));
410 // Update existing contacts and insert new ones
411 for updated_contact in updated_contacts {
412 match this.contacts.binary_search_by_key(
413 &&updated_contact.user.github_login,
414 |contact| &contact.user.github_login,
415 ) {
416 Ok(ix) => this.contacts[ix] = updated_contact,
417 Err(ix) => this.contacts.insert(ix, updated_contact),
418 }
419 }
420
421 // Remove incoming contact requests
422 this.incoming_contact_requests.retain(|user| {
423 if removed_incoming_requests.contains(&user.id) {
424 cx.emit(Event::Contact {
425 user: user.clone(),
426 kind: ContactEventKind::Cancelled,
427 });
428 false
429 } else {
430 true
431 }
432 });
433 // Update existing incoming requests and insert new ones
434 for user in incoming_requests {
435 match this
436 .incoming_contact_requests
437 .binary_search_by_key(&&user.github_login, |contact| {
438 &contact.github_login
439 }) {
440 Ok(ix) => this.incoming_contact_requests[ix] = user,
441 Err(ix) => this.incoming_contact_requests.insert(ix, user),
442 }
443 }
444
445 // Remove outgoing contact requests
446 this.outgoing_contact_requests
447 .retain(|user| !removed_outgoing_requests.contains(&user.id));
448 // Update existing incoming requests and insert new ones
449 for request in outgoing_requests {
450 match this
451 .outgoing_contact_requests
452 .binary_search_by_key(&&request.github_login, |contact| {
453 &contact.github_login
454 }) {
455 Ok(ix) => this.outgoing_contact_requests[ix] = request,
456 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
457 }
458 }
459
460 cx.notify();
461 })?;
462
463 Ok(())
464 })
465 }
466 }
467 }
468
469 pub fn contacts(&self) -> &[Arc<Contact>] {
470 &self.contacts
471 }
472
473 pub fn has_contact(&self, user: &Arc<User>) -> bool {
474 self.contacts
475 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
476 .is_ok()
477 }
478
479 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
480 &self.incoming_contact_requests
481 }
482
483 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
484 &self.outgoing_contact_requests
485 }
486
487 pub fn is_contact_request_pending(&self, user: &User) -> bool {
488 self.pending_contact_requests.contains_key(&user.id)
489 }
490
491 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
492 if self
493 .contacts
494 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
495 .is_ok()
496 {
497 ContactRequestStatus::RequestAccepted
498 } else if self
499 .outgoing_contact_requests
500 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
501 .is_ok()
502 {
503 ContactRequestStatus::RequestSent
504 } else if self
505 .incoming_contact_requests
506 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
507 .is_ok()
508 {
509 ContactRequestStatus::RequestReceived
510 } else {
511 ContactRequestStatus::None
512 }
513 }
514
515 pub fn request_contact(
516 &mut self,
517 responder_id: u64,
518 cx: &mut Context<Self>,
519 ) -> Task<Result<()>> {
520 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
521 }
522
523 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
524 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
525 }
526
527 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
528 self.incoming_contact_requests
529 .iter()
530 .any(|user| user.id == user_id)
531 }
532
533 pub fn respond_to_contact_request(
534 &mut self,
535 requester_id: u64,
536 accept: bool,
537 cx: &mut Context<Self>,
538 ) -> Task<Result<()>> {
539 self.perform_contact_request(
540 requester_id,
541 proto::RespondToContactRequest {
542 requester_id,
543 response: if accept {
544 proto::ContactRequestResponse::Accept
545 } else {
546 proto::ContactRequestResponse::Decline
547 } as i32,
548 },
549 cx,
550 )
551 }
552
553 pub fn dismiss_contact_request(
554 &self,
555 requester_id: u64,
556 cx: &Context<Self>,
557 ) -> Task<Result<()>> {
558 let client = self.client.upgrade();
559 cx.spawn(async move |_, _| {
560 client
561 .context("can't upgrade client reference")?
562 .request(proto::RespondToContactRequest {
563 requester_id,
564 response: proto::ContactRequestResponse::Dismiss as i32,
565 })
566 .await?;
567 Ok(())
568 })
569 }
570
571 fn perform_contact_request<T: RequestMessage>(
572 &mut self,
573 user_id: u64,
574 request: T,
575 cx: &mut Context<Self>,
576 ) -> Task<Result<()>> {
577 let client = self.client.upgrade();
578 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
579 cx.notify();
580
581 cx.spawn(async move |this, cx| {
582 let response = client
583 .context("can't upgrade client reference")?
584 .request(request)
585 .await;
586 this.update(cx, |this, cx| {
587 if let Entry::Occupied(mut request_count) =
588 this.pending_contact_requests.entry(user_id)
589 {
590 *request_count.get_mut() -= 1;
591 if *request_count.get() == 0 {
592 request_count.remove();
593 }
594 }
595 cx.notify();
596 })?;
597 response?;
598 Ok(())
599 })
600 }
601
602 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
603 let (tx, mut rx) = postage::barrier::channel();
604 self.update_contacts_tx
605 .unbounded_send(UpdateContacts::Clear(tx))
606 .unwrap();
607 async move {
608 rx.next().await;
609 }
610 }
611
612 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
613 let (tx, mut rx) = postage::barrier::channel();
614 self.update_contacts_tx
615 .unbounded_send(UpdateContacts::Wait(tx))
616 .unwrap();
617 async move {
618 rx.next().await;
619 }
620 }
621
622 pub fn get_users(
623 &self,
624 user_ids: Vec<u64>,
625 cx: &Context<Self>,
626 ) -> Task<Result<Vec<Arc<User>>>> {
627 let mut user_ids_to_fetch = user_ids.clone();
628 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
629
630 cx.spawn(async move |this, cx| {
631 if !user_ids_to_fetch.is_empty() {
632 this.update(cx, |this, cx| {
633 this.load_users(
634 proto::GetUsers {
635 user_ids: user_ids_to_fetch,
636 },
637 cx,
638 )
639 })?
640 .await?;
641 }
642
643 this.read_with(cx, |this, _| {
644 user_ids
645 .iter()
646 .map(|user_id| {
647 this.users
648 .get(user_id)
649 .cloned()
650 .with_context(|| format!("user {user_id} not found"))
651 })
652 .collect()
653 })?
654 })
655 }
656
657 pub fn fuzzy_search_users(
658 &self,
659 query: String,
660 cx: &Context<Self>,
661 ) -> Task<Result<Vec<Arc<User>>>> {
662 self.load_users(proto::FuzzySearchUsers { query }, cx)
663 }
664
665 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
666 self.users.get(&user_id).cloned()
667 }
668
669 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
670 if let Some(user) = self.users.get(&user_id).cloned() {
671 return Some(user);
672 }
673
674 self.get_user(user_id, cx).detach_and_log_err(cx);
675 None
676 }
677
678 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
679 if let Some(user) = self.users.get(&user_id).cloned() {
680 return Task::ready(Ok(user));
681 }
682
683 let load_users = self.get_users(vec![user_id], cx);
684 cx.spawn(async move |this, cx| {
685 load_users.await?;
686 this.read_with(cx, |this, _| {
687 this.users
688 .get(&user_id)
689 .cloned()
690 .context("server responded with no users")
691 })?
692 })
693 }
694
695 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
696 self.by_github_login
697 .get(github_login)
698 .and_then(|id| self.users.get(id).cloned())
699 }
700
701 pub fn current_user(&self) -> Option<Arc<User>> {
702 self.current_user.borrow().clone()
703 }
704
705 pub fn plan(&self) -> Option<cloud_llm_client::Plan> {
706 #[cfg(debug_assertions)]
707 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
708 return match plan.as_str() {
709 "free" => Some(cloud_llm_client::Plan::ZedFree),
710 "trial" => Some(cloud_llm_client::Plan::ZedProTrial),
711 "pro" => Some(cloud_llm_client::Plan::ZedPro),
712 _ => {
713 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
714 }
715 };
716 }
717
718 self.plan_info.as_ref().map(|info| info.plan)
719 }
720
721 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
722 self.plan_info
723 .as_ref()
724 .and_then(|plan| plan.subscription_period)
725 .map(|subscription_period| {
726 (
727 subscription_period.started_at.0,
728 subscription_period.ended_at.0,
729 )
730 })
731 }
732
733 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
734 self.plan_info
735 .as_ref()
736 .and_then(|plan| plan.trial_started_at)
737 .map(|trial_started_at| trial_started_at.0)
738 }
739
740 /// Returns whether the user's account is too new to use the service.
741 pub fn account_too_young(&self) -> bool {
742 self.plan_info
743 .as_ref()
744 .map(|plan| plan.is_account_too_young)
745 .unwrap_or_default()
746 }
747
748 /// Returns whether the current user has overdue invoices and usage should be blocked.
749 pub fn has_overdue_invoices(&self) -> bool {
750 self.plan_info
751 .as_ref()
752 .map(|plan| plan.has_overdue_invoices)
753 .unwrap_or_default()
754 }
755
756 pub fn is_usage_based_billing_enabled(&self) -> bool {
757 self.plan_info
758 .as_ref()
759 .map(|plan| plan.is_usage_based_billing_enabled)
760 .unwrap_or_default()
761 }
762
763 pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
764 self.model_request_usage
765 }
766
767 pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
768 self.model_request_usage = Some(usage);
769 cx.notify();
770 }
771
772 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
773 self.edit_prediction_usage
774 }
775
776 pub fn update_edit_prediction_usage(
777 &mut self,
778 usage: EditPredictionUsage,
779 cx: &mut Context<Self>,
780 ) {
781 self.edit_prediction_usage = Some(usage);
782 cx.notify();
783 }
784
785 fn update_authenticated_user(
786 &mut self,
787 response: GetAuthenticatedUserResponse,
788 cx: &mut Context<Self>,
789 ) {
790 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
791 cx.update_flags(staff, response.feature_flags);
792 if let Some(client) = self.client.upgrade() {
793 client
794 .telemetry
795 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
796 }
797
798 let accepted_tos_at = {
799 #[cfg(debug_assertions)]
800 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok() {
801 None
802 } else {
803 response.user.accepted_tos_at
804 }
805
806 #[cfg(not(debug_assertions))]
807 response.user.accepted_tos_at
808 };
809
810 self.accepted_tos_at = Some(accepted_tos_at);
811 self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
812 limit: response.plan.usage.model_requests.limit,
813 amount: response.plan.usage.model_requests.used as i32,
814 }));
815 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
816 limit: response.plan.usage.edit_predictions.limit,
817 amount: response.plan.usage.edit_predictions.used as i32,
818 }));
819 self.plan_info = Some(response.plan);
820 cx.emit(Event::PrivateUserInfoUpdated);
821 }
822
823 fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
824 cx.spawn(async move |cx| {
825 match message {
826 MessageToClient::UserUpdated => {
827 let cloud_client = cx
828 .update(|cx| {
829 this.read_with(cx, |this, _cx| {
830 this.client.upgrade().map(|client| client.cloud_client())
831 })
832 })??
833 .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
834
835 let response = cloud_client.get_authenticated_user().await?;
836 cx.update(|cx| {
837 this.update(cx, |this, cx| {
838 this.update_authenticated_user(response, cx);
839 })
840 })??;
841 }
842 }
843
844 anyhow::Ok(())
845 })
846 .detach_and_log_err(cx);
847 }
848
849 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
850 self.current_user.clone()
851 }
852
853 pub fn has_accepted_terms_of_service(&self) -> bool {
854 self.accepted_tos_at
855 .map_or(false, |accepted_tos_at| accepted_tos_at.is_some())
856 }
857
858 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
859 if self.current_user().is_none() {
860 return Task::ready(Err(anyhow!("no current user")));
861 };
862
863 let client = self.client.clone();
864 cx.spawn(async move |this, cx| -> anyhow::Result<()> {
865 let client = client.upgrade().context("client not found")?;
866 let response = client
867 .cloud_client()
868 .accept_terms_of_service()
869 .await
870 .context("error accepting tos")?;
871 this.update(cx, |this, cx| {
872 this.accepted_tos_at = Some(response.user.accepted_tos_at);
873 cx.emit(Event::PrivateUserInfoUpdated);
874 })?;
875 Ok(())
876 })
877 }
878
879 fn load_users(
880 &self,
881 request: impl RequestMessage<Response = UsersResponse>,
882 cx: &Context<Self>,
883 ) -> Task<Result<Vec<Arc<User>>>> {
884 let client = self.client.clone();
885 cx.spawn(async move |this, cx| {
886 if let Some(rpc) = client.upgrade() {
887 let response = rpc.request(request).await.context("error loading users")?;
888 let users = response.users;
889
890 this.update(cx, |this, _| this.insert(users))
891 } else {
892 Ok(Vec::new())
893 }
894 })
895 }
896
897 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
898 let mut ret = Vec::with_capacity(users.len());
899 for user in users {
900 let user = User::new(user);
901 if let Some(old) = self.users.insert(user.id, user.clone()) {
902 if old.github_login != user.github_login {
903 self.by_github_login.remove(&old.github_login);
904 }
905 }
906 self.by_github_login
907 .insert(user.github_login.clone(), user.id);
908 ret.push(user)
909 }
910 ret
911 }
912
913 pub fn set_participant_indices(
914 &mut self,
915 participant_indices: HashMap<u64, ParticipantIndex>,
916 cx: &mut Context<Self>,
917 ) {
918 if participant_indices != self.participant_indices {
919 self.participant_indices = participant_indices;
920 cx.emit(Event::ParticipantIndicesChanged);
921 }
922 }
923
924 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
925 &self.participant_indices
926 }
927
928 pub fn participant_names(
929 &self,
930 user_ids: impl Iterator<Item = u64>,
931 cx: &App,
932 ) -> HashMap<u64, SharedString> {
933 let mut ret = HashMap::default();
934 let mut missing_user_ids = Vec::new();
935 for id in user_ids {
936 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
937 ret.insert(id, github_login);
938 } else {
939 missing_user_ids.push(id)
940 }
941 }
942 if !missing_user_ids.is_empty() {
943 let this = self.weak_self.clone();
944 cx.spawn(async move |cx| {
945 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
946 .await
947 })
948 .detach_and_log_err(cx);
949 }
950 ret
951 }
952}
953
954impl User {
955 fn new(message: proto::User) -> Arc<Self> {
956 Arc::new(User {
957 id: message.id,
958 github_login: message.github_login.into(),
959 avatar_uri: message.avatar_url.into(),
960 name: message.name,
961 })
962 }
963}
964
965impl Contact {
966 async fn from_proto(
967 contact: proto::Contact,
968 user_store: &Entity<UserStore>,
969 cx: &mut AsyncApp,
970 ) -> Result<Self> {
971 let user = user_store
972 .update(cx, |user_store, cx| {
973 user_store.get_user(contact.user_id, cx)
974 })?
975 .await?;
976 Ok(Self {
977 user,
978 online: contact.online,
979 busy: contact.busy,
980 })
981 }
982}
983
984impl Collaborator {
985 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
986 Ok(Self {
987 peer_id: message.peer_id.context("invalid peer id")?,
988 replica_id: message.replica_id as ReplicaId,
989 user_id: message.user_id as UserId,
990 is_host: message.is_host,
991 committer_name: message.committer_name,
992 committer_email: message.committer_email,
993 })
994 }
995}
996
997impl RequestUsage {
998 pub fn over_limit(&self) -> bool {
999 match self.limit {
1000 UsageLimit::Limited(limit) => self.amount >= limit,
1001 UsageLimit::Unlimited => false,
1002 }
1003 }
1004
1005 pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
1006 let limit = match limit.variant? {
1007 proto::usage_limit::Variant::Limited(limited) => {
1008 UsageLimit::Limited(limited.limit as i32)
1009 }
1010 proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
1011 };
1012 Some(RequestUsage {
1013 limit,
1014 amount: amount as i32,
1015 })
1016 }
1017
1018 fn from_headers(
1019 limit_name: &str,
1020 amount_name: &str,
1021 headers: &HeaderMap<HeaderValue>,
1022 ) -> Result<Self> {
1023 let limit = headers
1024 .get(limit_name)
1025 .with_context(|| format!("missing {limit_name:?} header"))?;
1026 let limit = UsageLimit::from_str(limit.to_str()?)?;
1027
1028 let amount = headers
1029 .get(amount_name)
1030 .with_context(|| format!("missing {amount_name:?} header"))?;
1031 let amount = amount.to_str()?.parse::<i32>()?;
1032
1033 Ok(Self { limit, amount })
1034 }
1035}
1036
1037impl ModelRequestUsage {
1038 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1039 Ok(Self(RequestUsage::from_headers(
1040 MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
1041 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
1042 headers,
1043 )?))
1044 }
1045}
1046
1047impl EditPredictionUsage {
1048 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1049 Ok(Self(RequestUsage::from_headers(
1050 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1051 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1052 headers,
1053 )?))
1054 }
1055}