1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
5use cloud_llm_client::{
6 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
7 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
8};
9use collections::{HashMap, HashSet, hash_map::Entry};
10use derive_more::Deref;
11use feature_flags::FeatureFlagAppExt;
12use futures::{Future, StreamExt, channel::mpsc};
13use gpui::{
14 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
15};
16use http_client::http::{HeaderMap, HeaderValue};
17use postage::{sink::Sink, watch};
18use rpc::proto::{RequestMessage, UsersResponse};
19use std::{
20 str::FromStr as _,
21 sync::{Arc, Weak},
22};
23use text::ReplicaId;
24use util::{ResultExt, TryFutureExt as _};
25
26pub type UserId = u64;
27
28#[derive(
29 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
30)]
31pub struct ChannelId(pub u64);
32
33impl std::fmt::Display for ChannelId {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 self.0.fmt(f)
36 }
37}
38
39#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
40pub struct ProjectId(pub u64);
41
42impl ProjectId {
43 pub fn to_proto(&self) -> u64 {
44 self.0
45 }
46}
47
48#[derive(
49 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
50)]
51pub struct DevServerProjectId(pub u64);
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub struct ParticipantIndex(pub u32);
55
56#[derive(Default, Debug)]
57pub struct User {
58 pub id: UserId,
59 pub github_login: SharedString,
60 pub avatar_uri: SharedUri,
61 pub name: Option<String>,
62}
63
64#[derive(Clone, Debug, PartialEq, Eq)]
65pub struct Collaborator {
66 pub peer_id: proto::PeerId,
67 pub replica_id: ReplicaId,
68 pub user_id: UserId,
69 pub is_host: bool,
70 pub committer_name: Option<String>,
71 pub committer_email: Option<String>,
72}
73
74impl PartialOrd for User {
75 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
76 Some(self.cmp(other))
77 }
78}
79
80impl Ord for User {
81 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
82 self.github_login.cmp(&other.github_login)
83 }
84}
85
86impl PartialEq for User {
87 fn eq(&self, other: &Self) -> bool {
88 self.id == other.id && self.github_login == other.github_login
89 }
90}
91
92impl Eq for User {}
93
94#[derive(Debug, PartialEq)]
95pub struct Contact {
96 pub user: Arc<User>,
97 pub online: bool,
98 pub busy: bool,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102pub enum ContactRequestStatus {
103 None,
104 RequestSent,
105 RequestReceived,
106 RequestAccepted,
107}
108
109pub struct UserStore {
110 users: HashMap<u64, Arc<User>>,
111 by_github_login: HashMap<SharedString, u64>,
112 participant_indices: HashMap<u64, ParticipantIndex>,
113 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
114 model_request_usage: Option<ModelRequestUsage>,
115 edit_prediction_usage: Option<EditPredictionUsage>,
116 plan_info: Option<PlanInfo>,
117 current_user: watch::Receiver<Option<Arc<User>>>,
118 accepted_tos_at: Option<Option<cloud_api_client::Timestamp>>,
119 contacts: Vec<Arc<Contact>>,
120 incoming_contact_requests: Vec<Arc<User>>,
121 outgoing_contact_requests: Vec<Arc<User>>,
122 pending_contact_requests: HashMap<u64, usize>,
123 invite_info: Option<InviteInfo>,
124 client: Weak<Client>,
125 _maintain_contacts: Task<()>,
126 _maintain_current_user: Task<Result<()>>,
127 weak_self: WeakEntity<Self>,
128}
129
130#[derive(Clone)]
131pub struct InviteInfo {
132 pub count: u32,
133 pub url: Arc<str>,
134}
135
136pub enum Event {
137 Contact {
138 user: Arc<User>,
139 kind: ContactEventKind,
140 },
141 ShowContacts,
142 ParticipantIndicesChanged,
143 PrivateUserInfoUpdated,
144 PlanUpdated,
145}
146
147#[derive(Clone, Copy)]
148pub enum ContactEventKind {
149 Requested,
150 Accepted,
151 Cancelled,
152}
153
154impl EventEmitter<Event> for UserStore {}
155
156enum UpdateContacts {
157 Update(proto::UpdateContacts),
158 Wait(postage::barrier::Sender),
159 Clear(postage::barrier::Sender),
160}
161
162#[derive(Debug, Clone, Copy, Deref)]
163pub struct ModelRequestUsage(pub RequestUsage);
164
165#[derive(Debug, Clone, Copy, Deref)]
166pub struct EditPredictionUsage(pub RequestUsage);
167
168#[derive(Debug, Clone, Copy)]
169pub struct RequestUsage {
170 pub limit: UsageLimit,
171 pub amount: i32,
172}
173
174impl UserStore {
175 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
176 let (mut current_user_tx, current_user_rx) = watch::channel();
177 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
178 let rpc_subscriptions = vec![
179 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
180 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
181 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
182 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
183 ];
184 Self {
185 users: Default::default(),
186 by_github_login: Default::default(),
187 current_user: current_user_rx,
188 plan_info: None,
189 model_request_usage: None,
190 edit_prediction_usage: None,
191 accepted_tos_at: None,
192 contacts: Default::default(),
193 incoming_contact_requests: Default::default(),
194 participant_indices: Default::default(),
195 outgoing_contact_requests: Default::default(),
196 invite_info: None,
197 client: Arc::downgrade(&client),
198 update_contacts_tx,
199 _maintain_contacts: cx.spawn(async move |this, cx| {
200 let _subscriptions = rpc_subscriptions;
201 while let Some(message) = update_contacts_rx.next().await {
202 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
203 {
204 task.log_err().await;
205 } else {
206 break;
207 }
208 }
209 }),
210 _maintain_current_user: cx.spawn(async move |this, cx| {
211 let mut status = client.status();
212 let weak = Arc::downgrade(&client);
213 drop(client);
214 while let Some(status) = status.next().await {
215 // if the client is dropped, the app is shutting down.
216 let Some(client) = weak.upgrade() else {
217 return Ok(());
218 };
219 match status {
220 Status::Authenticated | Status::Connected { .. } => {
221 if let Some(user_id) = client.user_id() {
222 let response = client.cloud_client().get_authenticated_user().await;
223 let mut current_user = None;
224 cx.update(|cx| {
225 if let Some(response) = response.log_err() {
226 let user = Arc::new(User {
227 id: user_id,
228 github_login: response.user.github_login.clone().into(),
229 avatar_uri: response.user.avatar_url.clone().into(),
230 name: response.user.name.clone(),
231 });
232 current_user = Some(user.clone());
233 this.update(cx, |this, cx| {
234 this.by_github_login
235 .insert(user.github_login.clone(), user_id);
236 this.users.insert(user_id, user);
237 this.update_authenticated_user(response, cx)
238 })
239 } else {
240 anyhow::Ok(())
241 }
242 })??;
243 current_user_tx.send(current_user).await.ok();
244
245 this.update(cx, |_, cx| cx.notify())?;
246 }
247 }
248 Status::SignedOut => {
249 current_user_tx.send(None).await.ok();
250 this.update(cx, |this, cx| {
251 this.accepted_tos_at = None;
252 cx.emit(Event::PrivateUserInfoUpdated);
253 cx.notify();
254 this.clear_contacts()
255 })?
256 .await;
257 }
258 Status::ConnectionLost => {
259 this.update(cx, |this, cx| {
260 cx.notify();
261 this.clear_contacts()
262 })?
263 .await;
264 }
265 _ => {}
266 }
267 }
268 Ok(())
269 }),
270 pending_contact_requests: Default::default(),
271 weak_self: cx.weak_entity(),
272 }
273 }
274
275 #[cfg(feature = "test-support")]
276 pub fn clear_cache(&mut self) {
277 self.users.clear();
278 self.by_github_login.clear();
279 }
280
281 async fn handle_update_invite_info(
282 this: Entity<Self>,
283 message: TypedEnvelope<proto::UpdateInviteInfo>,
284 mut cx: AsyncApp,
285 ) -> Result<()> {
286 this.update(&mut cx, |this, cx| {
287 this.invite_info = Some(InviteInfo {
288 url: Arc::from(message.payload.url),
289 count: message.payload.count,
290 });
291 cx.notify();
292 })?;
293 Ok(())
294 }
295
296 async fn handle_show_contacts(
297 this: Entity<Self>,
298 _: TypedEnvelope<proto::ShowContacts>,
299 mut cx: AsyncApp,
300 ) -> Result<()> {
301 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
302 Ok(())
303 }
304
305 pub fn invite_info(&self) -> Option<&InviteInfo> {
306 self.invite_info.as_ref()
307 }
308
309 async fn handle_update_contacts(
310 this: Entity<Self>,
311 message: TypedEnvelope<proto::UpdateContacts>,
312 mut cx: AsyncApp,
313 ) -> Result<()> {
314 this.read_with(&mut cx, |this, _| {
315 this.update_contacts_tx
316 .unbounded_send(UpdateContacts::Update(message.payload))
317 .unwrap();
318 })?;
319 Ok(())
320 }
321
322 async fn handle_update_plan(
323 this: Entity<Self>,
324 _message: TypedEnvelope<proto::UpdateUserPlan>,
325 mut cx: AsyncApp,
326 ) -> Result<()> {
327 let client = this
328 .read_with(&cx, |this, _| this.client.upgrade())?
329 .context("client was dropped")?;
330
331 let response = client
332 .cloud_client()
333 .get_authenticated_user()
334 .await
335 .context("failed to fetch authenticated user")?;
336
337 this.update(&mut cx, |this, cx| {
338 this.update_authenticated_user(response, cx);
339 })
340 }
341
342 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
343 match message {
344 UpdateContacts::Wait(barrier) => {
345 drop(barrier);
346 Task::ready(Ok(()))
347 }
348 UpdateContacts::Clear(barrier) => {
349 self.contacts.clear();
350 self.incoming_contact_requests.clear();
351 self.outgoing_contact_requests.clear();
352 drop(barrier);
353 Task::ready(Ok(()))
354 }
355 UpdateContacts::Update(message) => {
356 let mut user_ids = HashSet::default();
357 for contact in &message.contacts {
358 user_ids.insert(contact.user_id);
359 }
360 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
361 user_ids.extend(message.outgoing_requests.iter());
362
363 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
364 cx.spawn(async move |this, cx| {
365 load_users.await?;
366
367 // Users are fetched in parallel above and cached in call to get_users
368 // No need to parallelize here
369 let mut updated_contacts = Vec::new();
370 let this = this.upgrade().context("can't upgrade user store handle")?;
371 for contact in message.contacts {
372 updated_contacts
373 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
374 }
375
376 let mut incoming_requests = Vec::new();
377 for request in message.incoming_requests {
378 incoming_requests.push({
379 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
380 .await?
381 });
382 }
383
384 let mut outgoing_requests = Vec::new();
385 for requested_user_id in message.outgoing_requests {
386 outgoing_requests.push(
387 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
388 .await?,
389 );
390 }
391
392 let removed_contacts =
393 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
394 let removed_incoming_requests =
395 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
396 let removed_outgoing_requests =
397 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
398
399 this.update(cx, |this, cx| {
400 // Remove contacts
401 this.contacts
402 .retain(|contact| !removed_contacts.contains(&contact.user.id));
403 // Update existing contacts and insert new ones
404 for updated_contact in updated_contacts {
405 match this.contacts.binary_search_by_key(
406 &&updated_contact.user.github_login,
407 |contact| &contact.user.github_login,
408 ) {
409 Ok(ix) => this.contacts[ix] = updated_contact,
410 Err(ix) => this.contacts.insert(ix, updated_contact),
411 }
412 }
413
414 // Remove incoming contact requests
415 this.incoming_contact_requests.retain(|user| {
416 if removed_incoming_requests.contains(&user.id) {
417 cx.emit(Event::Contact {
418 user: user.clone(),
419 kind: ContactEventKind::Cancelled,
420 });
421 false
422 } else {
423 true
424 }
425 });
426 // Update existing incoming requests and insert new ones
427 for user in incoming_requests {
428 match this
429 .incoming_contact_requests
430 .binary_search_by_key(&&user.github_login, |contact| {
431 &contact.github_login
432 }) {
433 Ok(ix) => this.incoming_contact_requests[ix] = user,
434 Err(ix) => this.incoming_contact_requests.insert(ix, user),
435 }
436 }
437
438 // Remove outgoing contact requests
439 this.outgoing_contact_requests
440 .retain(|user| !removed_outgoing_requests.contains(&user.id));
441 // Update existing incoming requests and insert new ones
442 for request in outgoing_requests {
443 match this
444 .outgoing_contact_requests
445 .binary_search_by_key(&&request.github_login, |contact| {
446 &contact.github_login
447 }) {
448 Ok(ix) => this.outgoing_contact_requests[ix] = request,
449 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
450 }
451 }
452
453 cx.notify();
454 })?;
455
456 Ok(())
457 })
458 }
459 }
460 }
461
462 pub fn contacts(&self) -> &[Arc<Contact>] {
463 &self.contacts
464 }
465
466 pub fn has_contact(&self, user: &Arc<User>) -> bool {
467 self.contacts
468 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
469 .is_ok()
470 }
471
472 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
473 &self.incoming_contact_requests
474 }
475
476 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
477 &self.outgoing_contact_requests
478 }
479
480 pub fn is_contact_request_pending(&self, user: &User) -> bool {
481 self.pending_contact_requests.contains_key(&user.id)
482 }
483
484 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
485 if self
486 .contacts
487 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
488 .is_ok()
489 {
490 ContactRequestStatus::RequestAccepted
491 } else if self
492 .outgoing_contact_requests
493 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
494 .is_ok()
495 {
496 ContactRequestStatus::RequestSent
497 } else if self
498 .incoming_contact_requests
499 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
500 .is_ok()
501 {
502 ContactRequestStatus::RequestReceived
503 } else {
504 ContactRequestStatus::None
505 }
506 }
507
508 pub fn request_contact(
509 &mut self,
510 responder_id: u64,
511 cx: &mut Context<Self>,
512 ) -> Task<Result<()>> {
513 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
514 }
515
516 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
517 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
518 }
519
520 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
521 self.incoming_contact_requests
522 .iter()
523 .any(|user| user.id == user_id)
524 }
525
526 pub fn respond_to_contact_request(
527 &mut self,
528 requester_id: u64,
529 accept: bool,
530 cx: &mut Context<Self>,
531 ) -> Task<Result<()>> {
532 self.perform_contact_request(
533 requester_id,
534 proto::RespondToContactRequest {
535 requester_id,
536 response: if accept {
537 proto::ContactRequestResponse::Accept
538 } else {
539 proto::ContactRequestResponse::Decline
540 } as i32,
541 },
542 cx,
543 )
544 }
545
546 pub fn dismiss_contact_request(
547 &self,
548 requester_id: u64,
549 cx: &Context<Self>,
550 ) -> Task<Result<()>> {
551 let client = self.client.upgrade();
552 cx.spawn(async move |_, _| {
553 client
554 .context("can't upgrade client reference")?
555 .request(proto::RespondToContactRequest {
556 requester_id,
557 response: proto::ContactRequestResponse::Dismiss as i32,
558 })
559 .await?;
560 Ok(())
561 })
562 }
563
564 fn perform_contact_request<T: RequestMessage>(
565 &mut self,
566 user_id: u64,
567 request: T,
568 cx: &mut Context<Self>,
569 ) -> Task<Result<()>> {
570 let client = self.client.upgrade();
571 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
572 cx.notify();
573
574 cx.spawn(async move |this, cx| {
575 let response = client
576 .context("can't upgrade client reference")?
577 .request(request)
578 .await;
579 this.update(cx, |this, cx| {
580 if let Entry::Occupied(mut request_count) =
581 this.pending_contact_requests.entry(user_id)
582 {
583 *request_count.get_mut() -= 1;
584 if *request_count.get() == 0 {
585 request_count.remove();
586 }
587 }
588 cx.notify();
589 })?;
590 response?;
591 Ok(())
592 })
593 }
594
595 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
596 let (tx, mut rx) = postage::barrier::channel();
597 self.update_contacts_tx
598 .unbounded_send(UpdateContacts::Clear(tx))
599 .unwrap();
600 async move {
601 rx.next().await;
602 }
603 }
604
605 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
606 let (tx, mut rx) = postage::barrier::channel();
607 self.update_contacts_tx
608 .unbounded_send(UpdateContacts::Wait(tx))
609 .unwrap();
610 async move {
611 rx.next().await;
612 }
613 }
614
615 pub fn get_users(
616 &self,
617 user_ids: Vec<u64>,
618 cx: &Context<Self>,
619 ) -> Task<Result<Vec<Arc<User>>>> {
620 let mut user_ids_to_fetch = user_ids.clone();
621 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
622
623 cx.spawn(async move |this, cx| {
624 if !user_ids_to_fetch.is_empty() {
625 this.update(cx, |this, cx| {
626 this.load_users(
627 proto::GetUsers {
628 user_ids: user_ids_to_fetch,
629 },
630 cx,
631 )
632 })?
633 .await?;
634 }
635
636 this.read_with(cx, |this, _| {
637 user_ids
638 .iter()
639 .map(|user_id| {
640 this.users
641 .get(user_id)
642 .cloned()
643 .with_context(|| format!("user {user_id} not found"))
644 })
645 .collect()
646 })?
647 })
648 }
649
650 pub fn fuzzy_search_users(
651 &self,
652 query: String,
653 cx: &Context<Self>,
654 ) -> Task<Result<Vec<Arc<User>>>> {
655 self.load_users(proto::FuzzySearchUsers { query }, cx)
656 }
657
658 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
659 self.users.get(&user_id).cloned()
660 }
661
662 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
663 if let Some(user) = self.users.get(&user_id).cloned() {
664 return Some(user);
665 }
666
667 self.get_user(user_id, cx).detach_and_log_err(cx);
668 None
669 }
670
671 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
672 if let Some(user) = self.users.get(&user_id).cloned() {
673 return Task::ready(Ok(user));
674 }
675
676 let load_users = self.get_users(vec![user_id], cx);
677 cx.spawn(async move |this, cx| {
678 load_users.await?;
679 this.read_with(cx, |this, _| {
680 this.users
681 .get(&user_id)
682 .cloned()
683 .context("server responded with no users")
684 })?
685 })
686 }
687
688 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
689 self.by_github_login
690 .get(github_login)
691 .and_then(|id| self.users.get(id).cloned())
692 }
693
694 pub fn current_user(&self) -> Option<Arc<User>> {
695 self.current_user.borrow().clone()
696 }
697
698 pub fn plan(&self) -> Option<cloud_llm_client::Plan> {
699 #[cfg(debug_assertions)]
700 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
701 return match plan.as_str() {
702 "free" => Some(cloud_llm_client::Plan::ZedFree),
703 "trial" => Some(cloud_llm_client::Plan::ZedProTrial),
704 "pro" => Some(cloud_llm_client::Plan::ZedPro),
705 _ => {
706 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
707 }
708 };
709 }
710
711 self.plan_info.as_ref().map(|info| info.plan)
712 }
713
714 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
715 self.plan_info
716 .as_ref()
717 .and_then(|plan| plan.subscription_period)
718 .map(|subscription_period| {
719 (
720 subscription_period.started_at.0,
721 subscription_period.ended_at.0,
722 )
723 })
724 }
725
726 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
727 self.plan_info
728 .as_ref()
729 .and_then(|plan| plan.trial_started_at)
730 .map(|trial_started_at| trial_started_at.0)
731 }
732
733 /// Returns whether the user's account is too new to use the service.
734 pub fn account_too_young(&self) -> bool {
735 self.plan_info
736 .as_ref()
737 .map(|plan| plan.is_account_too_young)
738 .unwrap_or_default()
739 }
740
741 /// Returns whether the current user has overdue invoices and usage should be blocked.
742 pub fn has_overdue_invoices(&self) -> bool {
743 self.plan_info
744 .as_ref()
745 .map(|plan| plan.has_overdue_invoices)
746 .unwrap_or_default()
747 }
748
749 pub fn is_usage_based_billing_enabled(&self) -> bool {
750 self.plan_info
751 .as_ref()
752 .map(|plan| plan.is_usage_based_billing_enabled)
753 .unwrap_or_default()
754 }
755
756 pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
757 self.model_request_usage
758 }
759
760 pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
761 self.model_request_usage = Some(usage);
762 cx.notify();
763 }
764
765 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
766 self.edit_prediction_usage
767 }
768
769 pub fn update_edit_prediction_usage(
770 &mut self,
771 usage: EditPredictionUsage,
772 cx: &mut Context<Self>,
773 ) {
774 self.edit_prediction_usage = Some(usage);
775 cx.notify();
776 }
777
778 fn update_authenticated_user(
779 &mut self,
780 response: GetAuthenticatedUserResponse,
781 cx: &mut Context<Self>,
782 ) {
783 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
784 cx.update_flags(staff, response.feature_flags);
785 if let Some(client) = self.client.upgrade() {
786 client
787 .telemetry
788 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
789 }
790
791 let accepted_tos_at = {
792 #[cfg(debug_assertions)]
793 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok() {
794 None
795 } else {
796 response.user.accepted_tos_at
797 }
798
799 #[cfg(not(debug_assertions))]
800 response.user.accepted_tos_at
801 };
802
803 self.accepted_tos_at = Some(accepted_tos_at);
804 self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
805 limit: response.plan.usage.model_requests.limit,
806 amount: response.plan.usage.model_requests.used as i32,
807 }));
808 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
809 limit: response.plan.usage.edit_predictions.limit,
810 amount: response.plan.usage.edit_predictions.used as i32,
811 }));
812 self.plan_info = Some(response.plan);
813 cx.emit(Event::PrivateUserInfoUpdated);
814 }
815
816 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
817 self.current_user.clone()
818 }
819
820 pub fn has_accepted_terms_of_service(&self) -> bool {
821 self.accepted_tos_at
822 .map_or(false, |accepted_tos_at| accepted_tos_at.is_some())
823 }
824
825 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
826 if self.current_user().is_none() {
827 return Task::ready(Err(anyhow!("no current user")));
828 };
829
830 let client = self.client.clone();
831 cx.spawn(async move |this, cx| -> anyhow::Result<()> {
832 let client = client.upgrade().context("client not found")?;
833 let response = client
834 .cloud_client()
835 .accept_terms_of_service()
836 .await
837 .context("error accepting tos")?;
838 this.update(cx, |this, cx| {
839 this.accepted_tos_at = Some(response.user.accepted_tos_at);
840 cx.emit(Event::PrivateUserInfoUpdated);
841 })?;
842 Ok(())
843 })
844 }
845
846 fn load_users(
847 &self,
848 request: impl RequestMessage<Response = UsersResponse>,
849 cx: &Context<Self>,
850 ) -> Task<Result<Vec<Arc<User>>>> {
851 let client = self.client.clone();
852 cx.spawn(async move |this, cx| {
853 if let Some(rpc) = client.upgrade() {
854 let response = rpc.request(request).await.context("error loading users")?;
855 let users = response.users;
856
857 this.update(cx, |this, _| this.insert(users))
858 } else {
859 Ok(Vec::new())
860 }
861 })
862 }
863
864 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
865 let mut ret = Vec::with_capacity(users.len());
866 for user in users {
867 let user = User::new(user);
868 if let Some(old) = self.users.insert(user.id, user.clone()) {
869 if old.github_login != user.github_login {
870 self.by_github_login.remove(&old.github_login);
871 }
872 }
873 self.by_github_login
874 .insert(user.github_login.clone(), user.id);
875 ret.push(user)
876 }
877 ret
878 }
879
880 pub fn set_participant_indices(
881 &mut self,
882 participant_indices: HashMap<u64, ParticipantIndex>,
883 cx: &mut Context<Self>,
884 ) {
885 if participant_indices != self.participant_indices {
886 self.participant_indices = participant_indices;
887 cx.emit(Event::ParticipantIndicesChanged);
888 }
889 }
890
891 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
892 &self.participant_indices
893 }
894
895 pub fn participant_names(
896 &self,
897 user_ids: impl Iterator<Item = u64>,
898 cx: &App,
899 ) -> HashMap<u64, SharedString> {
900 let mut ret = HashMap::default();
901 let mut missing_user_ids = Vec::new();
902 for id in user_ids {
903 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
904 ret.insert(id, github_login);
905 } else {
906 missing_user_ids.push(id)
907 }
908 }
909 if !missing_user_ids.is_empty() {
910 let this = self.weak_self.clone();
911 cx.spawn(async move |cx| {
912 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
913 .await
914 })
915 .detach_and_log_err(cx);
916 }
917 ret
918 }
919}
920
921impl User {
922 fn new(message: proto::User) -> Arc<Self> {
923 Arc::new(User {
924 id: message.id,
925 github_login: message.github_login.into(),
926 avatar_uri: message.avatar_url.into(),
927 name: message.name,
928 })
929 }
930}
931
932impl Contact {
933 async fn from_proto(
934 contact: proto::Contact,
935 user_store: &Entity<UserStore>,
936 cx: &mut AsyncApp,
937 ) -> Result<Self> {
938 let user = user_store
939 .update(cx, |user_store, cx| {
940 user_store.get_user(contact.user_id, cx)
941 })?
942 .await?;
943 Ok(Self {
944 user,
945 online: contact.online,
946 busy: contact.busy,
947 })
948 }
949}
950
951impl Collaborator {
952 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
953 Ok(Self {
954 peer_id: message.peer_id.context("invalid peer id")?,
955 replica_id: message.replica_id as ReplicaId,
956 user_id: message.user_id as UserId,
957 is_host: message.is_host,
958 committer_name: message.committer_name,
959 committer_email: message.committer_email,
960 })
961 }
962}
963
964impl RequestUsage {
965 pub fn over_limit(&self) -> bool {
966 match self.limit {
967 UsageLimit::Limited(limit) => self.amount >= limit,
968 UsageLimit::Unlimited => false,
969 }
970 }
971
972 pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
973 let limit = match limit.variant? {
974 proto::usage_limit::Variant::Limited(limited) => {
975 UsageLimit::Limited(limited.limit as i32)
976 }
977 proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
978 };
979 Some(RequestUsage {
980 limit,
981 amount: amount as i32,
982 })
983 }
984
985 fn from_headers(
986 limit_name: &str,
987 amount_name: &str,
988 headers: &HeaderMap<HeaderValue>,
989 ) -> Result<Self> {
990 let limit = headers
991 .get(limit_name)
992 .with_context(|| format!("missing {limit_name:?} header"))?;
993 let limit = UsageLimit::from_str(limit.to_str()?)?;
994
995 let amount = headers
996 .get(amount_name)
997 .with_context(|| format!("missing {amount_name:?} header"))?;
998 let amount = amount.to_str()?.parse::<i32>()?;
999
1000 Ok(Self { limit, amount })
1001 }
1002}
1003
1004impl ModelRequestUsage {
1005 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1006 Ok(Self(RequestUsage::from_headers(
1007 MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
1008 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
1009 headers,
1010 )?))
1011 }
1012}
1013
1014impl EditPredictionUsage {
1015 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1016 Ok(Self(RequestUsage::from_headers(
1017 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1018 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1019 headers,
1020 )?))
1021 }
1022}