1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use cloud_api_client::websocket_protocol::MessageToClient;
5use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
6use cloud_llm_client::{
7 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
8 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
9};
10use collections::{HashMap, HashSet, hash_map::Entry};
11use derive_more::Deref;
12use feature_flags::FeatureFlagAppExt;
13use futures::{Future, StreamExt, channel::mpsc};
14use gpui::{
15 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
16};
17use http_client::http::{HeaderMap, HeaderValue};
18use postage::{sink::Sink, watch};
19use rpc::proto::{RequestMessage, UsersResponse};
20use std::{
21 str::FromStr as _,
22 sync::{Arc, Weak},
23};
24use text::ReplicaId;
25use util::{ResultExt, TryFutureExt as _};
26
27pub type UserId = u64;
28
29#[derive(
30 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
31)]
32pub struct ChannelId(pub u64);
33
34impl std::fmt::Display for ChannelId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 self.0.fmt(f)
37 }
38}
39
40#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
41pub struct ProjectId(pub u64);
42
43impl ProjectId {
44 pub fn to_proto(self) -> u64 {
45 self.0
46 }
47}
48
49#[derive(
50 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
51)]
52pub struct DevServerProjectId(pub u64);
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub struct ParticipantIndex(pub u32);
56
57#[derive(Default, Debug)]
58pub struct User {
59 pub id: UserId,
60 pub github_login: SharedString,
61 pub avatar_uri: SharedUri,
62 pub name: Option<String>,
63}
64
65#[derive(Clone, Debug, PartialEq, Eq)]
66pub struct Collaborator {
67 pub peer_id: proto::PeerId,
68 pub replica_id: ReplicaId,
69 pub user_id: UserId,
70 pub is_host: bool,
71 pub committer_name: Option<String>,
72 pub committer_email: Option<String>,
73}
74
75impl PartialOrd for User {
76 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
77 Some(self.cmp(other))
78 }
79}
80
81impl Ord for User {
82 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
83 self.github_login.cmp(&other.github_login)
84 }
85}
86
87impl PartialEq for User {
88 fn eq(&self, other: &Self) -> bool {
89 self.id == other.id && self.github_login == other.github_login
90 }
91}
92
93impl Eq for User {}
94
95#[derive(Debug, PartialEq)]
96pub struct Contact {
97 pub user: Arc<User>,
98 pub online: bool,
99 pub busy: bool,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum ContactRequestStatus {
104 None,
105 RequestSent,
106 RequestReceived,
107 RequestAccepted,
108}
109
110pub struct UserStore {
111 users: HashMap<u64, Arc<User>>,
112 by_github_login: HashMap<SharedString, u64>,
113 participant_indices: HashMap<u64, ParticipantIndex>,
114 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
115 model_request_usage: Option<ModelRequestUsage>,
116 edit_prediction_usage: Option<EditPredictionUsage>,
117 plan_info: Option<PlanInfo>,
118 current_user: watch::Receiver<Option<Arc<User>>>,
119 accepted_tos_at: Option<Option<cloud_api_client::Timestamp>>,
120 contacts: Vec<Arc<Contact>>,
121 incoming_contact_requests: Vec<Arc<User>>,
122 outgoing_contact_requests: Vec<Arc<User>>,
123 pending_contact_requests: HashMap<u64, usize>,
124 invite_info: Option<InviteInfo>,
125 client: Weak<Client>,
126 _maintain_contacts: Task<()>,
127 _maintain_current_user: Task<Result<()>>,
128 weak_self: WeakEntity<Self>,
129}
130
131#[derive(Clone)]
132pub struct InviteInfo {
133 pub count: u32,
134 pub url: Arc<str>,
135}
136
137pub enum Event {
138 Contact {
139 user: Arc<User>,
140 kind: ContactEventKind,
141 },
142 ShowContacts,
143 ParticipantIndicesChanged,
144 PrivateUserInfoUpdated,
145 PlanUpdated,
146}
147
148#[derive(Clone, Copy)]
149pub enum ContactEventKind {
150 Requested,
151 Accepted,
152 Cancelled,
153}
154
155impl EventEmitter<Event> for UserStore {}
156
157enum UpdateContacts {
158 Update(proto::UpdateContacts),
159 Wait(postage::barrier::Sender),
160 Clear(postage::barrier::Sender),
161}
162
163#[derive(Debug, Clone, Copy, Deref)]
164pub struct ModelRequestUsage(pub RequestUsage);
165
166#[derive(Debug, Clone, Copy, Deref)]
167pub struct EditPredictionUsage(pub RequestUsage);
168
169#[derive(Debug, Clone, Copy)]
170pub struct RequestUsage {
171 pub limit: UsageLimit,
172 pub amount: i32,
173}
174
175impl UserStore {
176 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
177 let (mut current_user_tx, current_user_rx) = watch::channel();
178 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
179 let rpc_subscriptions = vec![
180 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
181 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
182 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
183 ];
184
185 client.add_message_to_client_handler({
186 let this = cx.weak_entity();
187 move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
188 });
189
190 Self {
191 users: Default::default(),
192 by_github_login: Default::default(),
193 current_user: current_user_rx,
194 plan_info: None,
195 model_request_usage: None,
196 edit_prediction_usage: None,
197 accepted_tos_at: None,
198 contacts: Default::default(),
199 incoming_contact_requests: Default::default(),
200 participant_indices: Default::default(),
201 outgoing_contact_requests: Default::default(),
202 invite_info: None,
203 client: Arc::downgrade(&client),
204 update_contacts_tx,
205 _maintain_contacts: cx.spawn(async move |this, cx| {
206 let _subscriptions = rpc_subscriptions;
207 while let Some(message) = update_contacts_rx.next().await {
208 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
209 {
210 task.log_err().await;
211 } else {
212 break;
213 }
214 }
215 }),
216 _maintain_current_user: cx.spawn(async move |this, cx| {
217 let mut status = client.status();
218 let weak = Arc::downgrade(&client);
219 drop(client);
220 while let Some(status) = status.next().await {
221 // if the client is dropped, the app is shutting down.
222 let Some(client) = weak.upgrade() else {
223 return Ok(());
224 };
225 match status {
226 Status::Authenticated | Status::Connected { .. } => {
227 if let Some(user_id) = client.user_id() {
228 let response = client
229 .cloud_client()
230 .get_authenticated_user()
231 .await
232 .log_err();
233
234 let current_user_and_response = if let Some(response) = response {
235 let user = Arc::new(User {
236 id: user_id,
237 github_login: response.user.github_login.clone().into(),
238 avatar_uri: response.user.avatar_url.clone().into(),
239 name: response.user.name.clone(),
240 });
241
242 Some((user, response))
243 } else {
244 None
245 };
246 current_user_tx
247 .send(
248 current_user_and_response
249 .as_ref()
250 .map(|(user, _)| user.clone()),
251 )
252 .await
253 .ok();
254
255 cx.update(|cx| {
256 if let Some((user, response)) = current_user_and_response {
257 this.update(cx, |this, cx| {
258 this.by_github_login
259 .insert(user.github_login.clone(), user_id);
260 this.users.insert(user_id, user);
261 this.update_authenticated_user(response, cx)
262 })
263 } else {
264 anyhow::Ok(())
265 }
266 })??;
267
268 this.update(cx, |_, cx| cx.notify())?;
269 }
270 }
271 Status::SignedOut => {
272 current_user_tx.send(None).await.ok();
273 this.update(cx, |this, cx| {
274 this.accepted_tos_at = None;
275 cx.emit(Event::PrivateUserInfoUpdated);
276 cx.notify();
277 this.clear_contacts()
278 })?
279 .await;
280 }
281 Status::ConnectionLost => {
282 this.update(cx, |this, cx| {
283 cx.notify();
284 this.clear_contacts()
285 })?
286 .await;
287 }
288 _ => {}
289 }
290 }
291 Ok(())
292 }),
293 pending_contact_requests: Default::default(),
294 weak_self: cx.weak_entity(),
295 }
296 }
297
298 #[cfg(feature = "test-support")]
299 pub fn clear_cache(&mut self) {
300 self.users.clear();
301 self.by_github_login.clear();
302 }
303
304 async fn handle_update_invite_info(
305 this: Entity<Self>,
306 message: TypedEnvelope<proto::UpdateInviteInfo>,
307 mut cx: AsyncApp,
308 ) -> Result<()> {
309 this.update(&mut cx, |this, cx| {
310 this.invite_info = Some(InviteInfo {
311 url: Arc::from(message.payload.url),
312 count: message.payload.count,
313 });
314 cx.notify();
315 })?;
316 Ok(())
317 }
318
319 async fn handle_show_contacts(
320 this: Entity<Self>,
321 _: TypedEnvelope<proto::ShowContacts>,
322 mut cx: AsyncApp,
323 ) -> Result<()> {
324 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
325 Ok(())
326 }
327
328 pub fn invite_info(&self) -> Option<&InviteInfo> {
329 self.invite_info.as_ref()
330 }
331
332 async fn handle_update_contacts(
333 this: Entity<Self>,
334 message: TypedEnvelope<proto::UpdateContacts>,
335 cx: AsyncApp,
336 ) -> Result<()> {
337 this.read_with(&cx, |this, _| {
338 this.update_contacts_tx
339 .unbounded_send(UpdateContacts::Update(message.payload))
340 .unwrap();
341 })?;
342 Ok(())
343 }
344
345 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
346 match message {
347 UpdateContacts::Wait(barrier) => {
348 drop(barrier);
349 Task::ready(Ok(()))
350 }
351 UpdateContacts::Clear(barrier) => {
352 self.contacts.clear();
353 self.incoming_contact_requests.clear();
354 self.outgoing_contact_requests.clear();
355 drop(barrier);
356 Task::ready(Ok(()))
357 }
358 UpdateContacts::Update(message) => {
359 let mut user_ids = HashSet::default();
360 for contact in &message.contacts {
361 user_ids.insert(contact.user_id);
362 }
363 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
364 user_ids.extend(message.outgoing_requests.iter());
365
366 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
367 cx.spawn(async move |this, cx| {
368 load_users.await?;
369
370 // Users are fetched in parallel above and cached in call to get_users
371 // No need to parallelize here
372 let mut updated_contacts = Vec::new();
373 let this = this.upgrade().context("can't upgrade user store handle")?;
374 for contact in message.contacts {
375 updated_contacts
376 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
377 }
378
379 let mut incoming_requests = Vec::new();
380 for request in message.incoming_requests {
381 incoming_requests.push({
382 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
383 .await?
384 });
385 }
386
387 let mut outgoing_requests = Vec::new();
388 for requested_user_id in message.outgoing_requests {
389 outgoing_requests.push(
390 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
391 .await?,
392 );
393 }
394
395 let removed_contacts =
396 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
397 let removed_incoming_requests =
398 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
399 let removed_outgoing_requests =
400 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
401
402 this.update(cx, |this, cx| {
403 // Remove contacts
404 this.contacts
405 .retain(|contact| !removed_contacts.contains(&contact.user.id));
406 // Update existing contacts and insert new ones
407 for updated_contact in updated_contacts {
408 match this.contacts.binary_search_by_key(
409 &&updated_contact.user.github_login,
410 |contact| &contact.user.github_login,
411 ) {
412 Ok(ix) => this.contacts[ix] = updated_contact,
413 Err(ix) => this.contacts.insert(ix, updated_contact),
414 }
415 }
416
417 // Remove incoming contact requests
418 this.incoming_contact_requests.retain(|user| {
419 if removed_incoming_requests.contains(&user.id) {
420 cx.emit(Event::Contact {
421 user: user.clone(),
422 kind: ContactEventKind::Cancelled,
423 });
424 false
425 } else {
426 true
427 }
428 });
429 // Update existing incoming requests and insert new ones
430 for user in incoming_requests {
431 match this
432 .incoming_contact_requests
433 .binary_search_by_key(&&user.github_login, |contact| {
434 &contact.github_login
435 }) {
436 Ok(ix) => this.incoming_contact_requests[ix] = user,
437 Err(ix) => this.incoming_contact_requests.insert(ix, user),
438 }
439 }
440
441 // Remove outgoing contact requests
442 this.outgoing_contact_requests
443 .retain(|user| !removed_outgoing_requests.contains(&user.id));
444 // Update existing incoming requests and insert new ones
445 for request in outgoing_requests {
446 match this
447 .outgoing_contact_requests
448 .binary_search_by_key(&&request.github_login, |contact| {
449 &contact.github_login
450 }) {
451 Ok(ix) => this.outgoing_contact_requests[ix] = request,
452 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
453 }
454 }
455
456 cx.notify();
457 })?;
458
459 Ok(())
460 })
461 }
462 }
463 }
464
465 pub fn contacts(&self) -> &[Arc<Contact>] {
466 &self.contacts
467 }
468
469 pub fn has_contact(&self, user: &Arc<User>) -> bool {
470 self.contacts
471 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
472 .is_ok()
473 }
474
475 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
476 &self.incoming_contact_requests
477 }
478
479 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
480 &self.outgoing_contact_requests
481 }
482
483 pub fn is_contact_request_pending(&self, user: &User) -> bool {
484 self.pending_contact_requests.contains_key(&user.id)
485 }
486
487 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
488 if self
489 .contacts
490 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
491 .is_ok()
492 {
493 ContactRequestStatus::RequestAccepted
494 } else if self
495 .outgoing_contact_requests
496 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
497 .is_ok()
498 {
499 ContactRequestStatus::RequestSent
500 } else if self
501 .incoming_contact_requests
502 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
503 .is_ok()
504 {
505 ContactRequestStatus::RequestReceived
506 } else {
507 ContactRequestStatus::None
508 }
509 }
510
511 pub fn request_contact(
512 &mut self,
513 responder_id: u64,
514 cx: &mut Context<Self>,
515 ) -> Task<Result<()>> {
516 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
517 }
518
519 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
520 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
521 }
522
523 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
524 self.incoming_contact_requests
525 .iter()
526 .any(|user| user.id == user_id)
527 }
528
529 pub fn respond_to_contact_request(
530 &mut self,
531 requester_id: u64,
532 accept: bool,
533 cx: &mut Context<Self>,
534 ) -> Task<Result<()>> {
535 self.perform_contact_request(
536 requester_id,
537 proto::RespondToContactRequest {
538 requester_id,
539 response: if accept {
540 proto::ContactRequestResponse::Accept
541 } else {
542 proto::ContactRequestResponse::Decline
543 } as i32,
544 },
545 cx,
546 )
547 }
548
549 pub fn dismiss_contact_request(
550 &self,
551 requester_id: u64,
552 cx: &Context<Self>,
553 ) -> Task<Result<()>> {
554 let client = self.client.upgrade();
555 cx.spawn(async move |_, _| {
556 client
557 .context("can't upgrade client reference")?
558 .request(proto::RespondToContactRequest {
559 requester_id,
560 response: proto::ContactRequestResponse::Dismiss as i32,
561 })
562 .await?;
563 Ok(())
564 })
565 }
566
567 fn perform_contact_request<T: RequestMessage>(
568 &mut self,
569 user_id: u64,
570 request: T,
571 cx: &mut Context<Self>,
572 ) -> Task<Result<()>> {
573 let client = self.client.upgrade();
574 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
575 cx.notify();
576
577 cx.spawn(async move |this, cx| {
578 let response = client
579 .context("can't upgrade client reference")?
580 .request(request)
581 .await;
582 this.update(cx, |this, cx| {
583 if let Entry::Occupied(mut request_count) =
584 this.pending_contact_requests.entry(user_id)
585 {
586 *request_count.get_mut() -= 1;
587 if *request_count.get() == 0 {
588 request_count.remove();
589 }
590 }
591 cx.notify();
592 })?;
593 response?;
594 Ok(())
595 })
596 }
597
598 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
599 let (tx, mut rx) = postage::barrier::channel();
600 self.update_contacts_tx
601 .unbounded_send(UpdateContacts::Clear(tx))
602 .unwrap();
603 async move {
604 rx.next().await;
605 }
606 }
607
608 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
609 let (tx, mut rx) = postage::barrier::channel();
610 self.update_contacts_tx
611 .unbounded_send(UpdateContacts::Wait(tx))
612 .unwrap();
613 async move {
614 rx.next().await;
615 }
616 }
617
618 pub fn get_users(
619 &self,
620 user_ids: Vec<u64>,
621 cx: &Context<Self>,
622 ) -> Task<Result<Vec<Arc<User>>>> {
623 let mut user_ids_to_fetch = user_ids.clone();
624 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
625
626 cx.spawn(async move |this, cx| {
627 if !user_ids_to_fetch.is_empty() {
628 this.update(cx, |this, cx| {
629 this.load_users(
630 proto::GetUsers {
631 user_ids: user_ids_to_fetch,
632 },
633 cx,
634 )
635 })?
636 .await?;
637 }
638
639 this.read_with(cx, |this, _| {
640 user_ids
641 .iter()
642 .map(|user_id| {
643 this.users
644 .get(user_id)
645 .cloned()
646 .with_context(|| format!("user {user_id} not found"))
647 })
648 .collect()
649 })?
650 })
651 }
652
653 pub fn fuzzy_search_users(
654 &self,
655 query: String,
656 cx: &Context<Self>,
657 ) -> Task<Result<Vec<Arc<User>>>> {
658 self.load_users(proto::FuzzySearchUsers { query }, cx)
659 }
660
661 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
662 self.users.get(&user_id).cloned()
663 }
664
665 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
666 if let Some(user) = self.users.get(&user_id).cloned() {
667 return Some(user);
668 }
669
670 self.get_user(user_id, cx).detach_and_log_err(cx);
671 None
672 }
673
674 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
675 if let Some(user) = self.users.get(&user_id).cloned() {
676 return Task::ready(Ok(user));
677 }
678
679 let load_users = self.get_users(vec![user_id], cx);
680 cx.spawn(async move |this, cx| {
681 load_users.await?;
682 this.read_with(cx, |this, _| {
683 this.users
684 .get(&user_id)
685 .cloned()
686 .context("server responded with no users")
687 })?
688 })
689 }
690
691 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
692 self.by_github_login
693 .get(github_login)
694 .and_then(|id| self.users.get(id).cloned())
695 }
696
697 pub fn current_user(&self) -> Option<Arc<User>> {
698 self.current_user.borrow().clone()
699 }
700
701 pub fn plan(&self) -> Option<cloud_llm_client::Plan> {
702 #[cfg(debug_assertions)]
703 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
704 return match plan.as_str() {
705 "free" => Some(cloud_llm_client::Plan::ZedFree),
706 "trial" => Some(cloud_llm_client::Plan::ZedProTrial),
707 "pro" => Some(cloud_llm_client::Plan::ZedPro),
708 _ => {
709 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
710 }
711 };
712 }
713
714 self.plan_info.as_ref().map(|info| info.plan)
715 }
716
717 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
718 self.plan_info
719 .as_ref()
720 .and_then(|plan| plan.subscription_period)
721 .map(|subscription_period| {
722 (
723 subscription_period.started_at.0,
724 subscription_period.ended_at.0,
725 )
726 })
727 }
728
729 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
730 self.plan_info
731 .as_ref()
732 .and_then(|plan| plan.trial_started_at)
733 .map(|trial_started_at| trial_started_at.0)
734 }
735
736 /// Returns whether the user's account is too new to use the service.
737 pub fn account_too_young(&self) -> bool {
738 self.plan_info
739 .as_ref()
740 .map(|plan| plan.is_account_too_young)
741 .unwrap_or_default()
742 }
743
744 /// Returns whether the current user has overdue invoices and usage should be blocked.
745 pub fn has_overdue_invoices(&self) -> bool {
746 self.plan_info
747 .as_ref()
748 .map(|plan| plan.has_overdue_invoices)
749 .unwrap_or_default()
750 }
751
752 pub fn is_usage_based_billing_enabled(&self) -> bool {
753 self.plan_info
754 .as_ref()
755 .map(|plan| plan.is_usage_based_billing_enabled)
756 .unwrap_or_default()
757 }
758
759 pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
760 self.model_request_usage
761 }
762
763 pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
764 self.model_request_usage = Some(usage);
765 cx.notify();
766 }
767
768 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
769 self.edit_prediction_usage
770 }
771
772 pub fn update_edit_prediction_usage(
773 &mut self,
774 usage: EditPredictionUsage,
775 cx: &mut Context<Self>,
776 ) {
777 self.edit_prediction_usage = Some(usage);
778 cx.notify();
779 }
780
781 fn update_authenticated_user(
782 &mut self,
783 response: GetAuthenticatedUserResponse,
784 cx: &mut Context<Self>,
785 ) {
786 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
787 cx.update_flags(staff, response.feature_flags);
788 if let Some(client) = self.client.upgrade() {
789 client
790 .telemetry
791 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
792 }
793
794 let accepted_tos_at = {
795 #[cfg(debug_assertions)]
796 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok() {
797 None
798 } else {
799 response.user.accepted_tos_at
800 }
801
802 #[cfg(not(debug_assertions))]
803 response.user.accepted_tos_at
804 };
805
806 self.accepted_tos_at = Some(accepted_tos_at);
807 self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
808 limit: response.plan.usage.model_requests.limit,
809 amount: response.plan.usage.model_requests.used as i32,
810 }));
811 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
812 limit: response.plan.usage.edit_predictions.limit,
813 amount: response.plan.usage.edit_predictions.used as i32,
814 }));
815 self.plan_info = Some(response.plan);
816 cx.emit(Event::PrivateUserInfoUpdated);
817 }
818
819 fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
820 cx.spawn(async move |cx| {
821 match message {
822 MessageToClient::UserUpdated => {
823 let cloud_client = cx
824 .update(|cx| {
825 this.read_with(cx, |this, _cx| {
826 this.client.upgrade().map(|client| client.cloud_client())
827 })
828 })??
829 .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
830
831 let response = cloud_client.get_authenticated_user().await?;
832 cx.update(|cx| {
833 this.update(cx, |this, cx| {
834 this.update_authenticated_user(response, cx);
835 })
836 })??;
837 }
838 }
839
840 anyhow::Ok(())
841 })
842 .detach_and_log_err(cx);
843 }
844
845 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
846 self.current_user.clone()
847 }
848
849 pub fn has_accepted_terms_of_service(&self) -> bool {
850 self.accepted_tos_at
851 .is_some_and(|accepted_tos_at| accepted_tos_at.is_some())
852 }
853
854 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
855 if self.current_user().is_none() {
856 return Task::ready(Err(anyhow!("no current user")));
857 };
858
859 let client = self.client.clone();
860 cx.spawn(async move |this, cx| -> anyhow::Result<()> {
861 let client = client.upgrade().context("client not found")?;
862 let response = client
863 .cloud_client()
864 .accept_terms_of_service()
865 .await
866 .context("error accepting tos")?;
867 this.update(cx, |this, cx| {
868 this.accepted_tos_at = Some(response.user.accepted_tos_at);
869 cx.emit(Event::PrivateUserInfoUpdated);
870 })?;
871 Ok(())
872 })
873 }
874
875 fn load_users(
876 &self,
877 request: impl RequestMessage<Response = UsersResponse>,
878 cx: &Context<Self>,
879 ) -> Task<Result<Vec<Arc<User>>>> {
880 let client = self.client.clone();
881 cx.spawn(async move |this, cx| {
882 if let Some(rpc) = client.upgrade() {
883 let response = rpc.request(request).await.context("error loading users")?;
884 let users = response.users;
885
886 this.update(cx, |this, _| this.insert(users))
887 } else {
888 Ok(Vec::new())
889 }
890 })
891 }
892
893 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
894 let mut ret = Vec::with_capacity(users.len());
895 for user in users {
896 let user = User::new(user);
897 if let Some(old) = self.users.insert(user.id, user.clone())
898 && old.github_login != user.github_login
899 {
900 self.by_github_login.remove(&old.github_login);
901 }
902 self.by_github_login
903 .insert(user.github_login.clone(), user.id);
904 ret.push(user)
905 }
906 ret
907 }
908
909 pub fn set_participant_indices(
910 &mut self,
911 participant_indices: HashMap<u64, ParticipantIndex>,
912 cx: &mut Context<Self>,
913 ) {
914 if participant_indices != self.participant_indices {
915 self.participant_indices = participant_indices;
916 cx.emit(Event::ParticipantIndicesChanged);
917 }
918 }
919
920 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
921 &self.participant_indices
922 }
923
924 pub fn participant_names(
925 &self,
926 user_ids: impl Iterator<Item = u64>,
927 cx: &App,
928 ) -> HashMap<u64, SharedString> {
929 let mut ret = HashMap::default();
930 let mut missing_user_ids = Vec::new();
931 for id in user_ids {
932 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
933 ret.insert(id, github_login);
934 } else {
935 missing_user_ids.push(id)
936 }
937 }
938 if !missing_user_ids.is_empty() {
939 let this = self.weak_self.clone();
940 cx.spawn(async move |cx| {
941 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
942 .await
943 })
944 .detach_and_log_err(cx);
945 }
946 ret
947 }
948}
949
950impl User {
951 fn new(message: proto::User) -> Arc<Self> {
952 Arc::new(User {
953 id: message.id,
954 github_login: message.github_login.into(),
955 avatar_uri: message.avatar_url.into(),
956 name: message.name,
957 })
958 }
959}
960
961impl Contact {
962 async fn from_proto(
963 contact: proto::Contact,
964 user_store: &Entity<UserStore>,
965 cx: &mut AsyncApp,
966 ) -> Result<Self> {
967 let user = user_store
968 .update(cx, |user_store, cx| {
969 user_store.get_user(contact.user_id, cx)
970 })?
971 .await?;
972 Ok(Self {
973 user,
974 online: contact.online,
975 busy: contact.busy,
976 })
977 }
978}
979
980impl Collaborator {
981 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
982 Ok(Self {
983 peer_id: message.peer_id.context("invalid peer id")?,
984 replica_id: message.replica_id as ReplicaId,
985 user_id: message.user_id as UserId,
986 is_host: message.is_host,
987 committer_name: message.committer_name,
988 committer_email: message.committer_email,
989 })
990 }
991}
992
993impl RequestUsage {
994 pub fn over_limit(&self) -> bool {
995 match self.limit {
996 UsageLimit::Limited(limit) => self.amount >= limit,
997 UsageLimit::Unlimited => false,
998 }
999 }
1000
1001 fn from_headers(
1002 limit_name: &str,
1003 amount_name: &str,
1004 headers: &HeaderMap<HeaderValue>,
1005 ) -> Result<Self> {
1006 let limit = headers
1007 .get(limit_name)
1008 .with_context(|| format!("missing {limit_name:?} header"))?;
1009 let limit = UsageLimit::from_str(limit.to_str()?)?;
1010
1011 let amount = headers
1012 .get(amount_name)
1013 .with_context(|| format!("missing {amount_name:?} header"))?;
1014 let amount = amount.to_str()?.parse::<i32>()?;
1015
1016 Ok(Self { limit, amount })
1017 }
1018}
1019
1020impl ModelRequestUsage {
1021 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1022 Ok(Self(RequestUsage::from_headers(
1023 MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
1024 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
1025 headers,
1026 )?))
1027 }
1028}
1029
1030impl EditPredictionUsage {
1031 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1032 Ok(Self(RequestUsage::from_headers(
1033 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1034 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1035 headers,
1036 )?))
1037 }
1038}