1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result};
3use chrono::{DateTime, Utc};
4use cloud_api_client::websocket_protocol::MessageToClient;
5use cloud_api_client::{
6 GetAuthenticatedUserResponse, Organization, OrganizationId, Plan, PlanInfo,
7};
8use cloud_llm_client::{
9 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
10};
11use collections::{HashMap, HashSet, hash_map::Entry};
12use derive_more::Deref;
13use feature_flags::FeatureFlagAppExt;
14use futures::{Future, StreamExt, channel::mpsc};
15use gpui::{
16 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
17};
18use http_client::http::{HeaderMap, HeaderValue};
19use postage::{sink::Sink, watch};
20use rpc::proto::{RequestMessage, UsersResponse};
21use std::{
22 str::FromStr as _,
23 sync::{Arc, Weak},
24};
25use text::ReplicaId;
26use util::{ResultExt, TryFutureExt as _};
27
28pub type UserId = u64;
29
30#[derive(
31 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
32)]
33pub struct ChannelId(pub u64);
34
35impl std::fmt::Display for ChannelId {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 self.0.fmt(f)
38 }
39}
40
41#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
42pub struct ProjectId(pub u64);
43
44impl ProjectId {
45 pub fn to_proto(self) -> u64 {
46 self.0
47 }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub struct ParticipantIndex(pub u32);
52
53#[derive(Default, Debug)]
54pub struct User {
55 pub id: UserId,
56 pub github_login: SharedString,
57 pub avatar_uri: SharedUri,
58 pub name: Option<String>,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq)]
62pub struct Collaborator {
63 pub peer_id: proto::PeerId,
64 pub replica_id: ReplicaId,
65 pub user_id: UserId,
66 pub is_host: bool,
67 pub committer_name: Option<String>,
68 pub committer_email: Option<String>,
69}
70
71impl PartialOrd for User {
72 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
73 Some(self.cmp(other))
74 }
75}
76
77impl Ord for User {
78 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
79 self.github_login.cmp(&other.github_login)
80 }
81}
82
83impl PartialEq for User {
84 fn eq(&self, other: &Self) -> bool {
85 self.id == other.id && self.github_login == other.github_login
86 }
87}
88
89impl Eq for User {}
90
91#[derive(Debug, PartialEq)]
92pub struct Contact {
93 pub user: Arc<User>,
94 pub online: bool,
95 pub busy: bool,
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub enum ContactRequestStatus {
100 None,
101 RequestSent,
102 RequestReceived,
103 RequestAccepted,
104}
105
106pub struct UserStore {
107 users: HashMap<u64, Arc<User>>,
108 by_github_login: HashMap<SharedString, u64>,
109 participant_indices: HashMap<u64, ParticipantIndex>,
110 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
111 edit_prediction_usage: Option<EditPredictionUsage>,
112 plan_info: Option<PlanInfo>,
113 current_user: watch::Receiver<Option<Arc<User>>>,
114 current_organization: Option<Arc<Organization>>,
115 organizations: Vec<Arc<Organization>>,
116 plans_by_organization: HashMap<OrganizationId, Plan>,
117 contacts: Vec<Arc<Contact>>,
118 incoming_contact_requests: Vec<Arc<User>>,
119 outgoing_contact_requests: Vec<Arc<User>>,
120 pending_contact_requests: HashMap<u64, usize>,
121 client: Weak<Client>,
122 _maintain_contacts: Task<()>,
123 _maintain_current_user: Task<Result<()>>,
124 _handle_sign_out: Task<()>,
125 weak_self: WeakEntity<Self>,
126}
127
128#[derive(Clone)]
129pub struct InviteInfo {
130 pub count: u32,
131 pub url: Arc<str>,
132}
133
134pub enum Event {
135 Contact {
136 user: Arc<User>,
137 kind: ContactEventKind,
138 },
139 ShowContacts,
140 ParticipantIndicesChanged,
141 PrivateUserInfoUpdated,
142 PlanUpdated,
143 OrganizationChanged,
144}
145
146#[derive(Clone, Copy)]
147pub enum ContactEventKind {
148 Requested,
149 Accepted,
150 Cancelled,
151}
152
153impl EventEmitter<Event> for UserStore {}
154
155enum UpdateContacts {
156 Update(proto::UpdateContacts),
157 Wait(postage::barrier::Sender),
158 Clear(postage::barrier::Sender),
159}
160
161#[derive(Debug, Clone, Copy, Deref)]
162pub struct EditPredictionUsage(pub RequestUsage);
163
164#[derive(Debug, Clone, Copy)]
165pub struct RequestUsage {
166 pub limit: UsageLimit,
167 pub amount: i32,
168}
169
170impl UserStore {
171 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
172 let (mut current_user_tx, current_user_rx) = watch::channel();
173 let (sign_out_tx, mut sign_out_rx) = mpsc::unbounded();
174 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
175 let rpc_subscriptions = vec![
176 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
177 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
178 ];
179
180 client.sign_out_tx.lock().replace(sign_out_tx);
181 client.add_message_to_client_handler({
182 let this = cx.weak_entity();
183 move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
184 });
185
186 Self {
187 users: Default::default(),
188 by_github_login: Default::default(),
189 current_user: current_user_rx,
190 current_organization: None,
191 organizations: Vec::new(),
192 plans_by_organization: HashMap::default(),
193 plan_info: None,
194 edit_prediction_usage: None,
195 contacts: Default::default(),
196 incoming_contact_requests: Default::default(),
197 participant_indices: Default::default(),
198 outgoing_contact_requests: Default::default(),
199 client: Arc::downgrade(&client),
200 update_contacts_tx,
201 _maintain_contacts: cx.spawn(async move |this, cx| {
202 let _subscriptions = rpc_subscriptions;
203 while let Some(message) = update_contacts_rx.next().await {
204 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
205 {
206 task.log_err().await;
207 } else {
208 break;
209 }
210 }
211 }),
212 _maintain_current_user: cx.spawn(async move |this, cx| {
213 let mut status = client.status();
214 let weak = Arc::downgrade(&client);
215 drop(client);
216 while let Some(status) = status.next().await {
217 // if the client is dropped, the app is shutting down.
218 let Some(client) = weak.upgrade() else {
219 return Ok(());
220 };
221 match status {
222 Status::Authenticated
223 | Status::Reauthenticated
224 | Status::Connected { .. } => {
225 if let Some(user_id) = client.user_id() {
226 let response = client
227 .cloud_client()
228 .get_authenticated_user()
229 .await
230 .log_err();
231
232 let current_user_and_response = if let Some(response) = response {
233 let user = Arc::new(User {
234 id: user_id,
235 github_login: response.user.github_login.clone().into(),
236 avatar_uri: response.user.avatar_url.clone().into(),
237 name: response.user.name.clone(),
238 });
239
240 Some((user, response))
241 } else {
242 None
243 };
244 current_user_tx
245 .send(
246 current_user_and_response
247 .as_ref()
248 .map(|(user, _)| user.clone()),
249 )
250 .await
251 .ok();
252
253 cx.update(|cx| {
254 if let Some((user, response)) = current_user_and_response {
255 this.update(cx, |this, cx| {
256 this.by_github_login
257 .insert(user.github_login.clone(), user_id);
258 this.users.insert(user_id, user);
259 this.update_authenticated_user(response, cx)
260 })
261 } else {
262 anyhow::Ok(())
263 }
264 })?;
265
266 this.update(cx, |_, cx| cx.notify())?;
267 }
268 }
269 Status::SignedOut => {
270 current_user_tx.send(None).await.ok();
271 this.update(cx, |this, cx| {
272 this.clear_organizations();
273 this.clear_plan_and_usage();
274 cx.emit(Event::PrivateUserInfoUpdated);
275 cx.notify();
276 this.clear_contacts()
277 })?
278 .await;
279 }
280 Status::ConnectionLost => {
281 this.update(cx, |this, cx| {
282 cx.notify();
283 this.clear_contacts()
284 })?
285 .await;
286 }
287 _ => {}
288 }
289 }
290 Ok(())
291 }),
292 _handle_sign_out: cx.spawn(async move |this, cx| {
293 while let Some(()) = sign_out_rx.next().await {
294 let Some(client) = this
295 .read_with(cx, |this, _cx| this.client.upgrade())
296 .ok()
297 .flatten()
298 else {
299 break;
300 };
301
302 client.sign_out(cx).await;
303 }
304 }),
305 pending_contact_requests: Default::default(),
306 weak_self: cx.weak_entity(),
307 }
308 }
309
310 #[cfg(feature = "test-support")]
311 pub fn clear_cache(&mut self) {
312 self.users.clear();
313 self.by_github_login.clear();
314 }
315
316 async fn handle_show_contacts(
317 this: Entity<Self>,
318 _: TypedEnvelope<proto::ShowContacts>,
319 mut cx: AsyncApp,
320 ) -> Result<()> {
321 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
322 Ok(())
323 }
324
325 async fn handle_update_contacts(
326 this: Entity<Self>,
327 message: TypedEnvelope<proto::UpdateContacts>,
328 cx: AsyncApp,
329 ) -> Result<()> {
330 this.read_with(&cx, |this, _| {
331 this.update_contacts_tx
332 .unbounded_send(UpdateContacts::Update(message.payload))
333 .unwrap();
334 });
335 Ok(())
336 }
337
338 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
339 match message {
340 UpdateContacts::Wait(barrier) => {
341 drop(barrier);
342 Task::ready(Ok(()))
343 }
344 UpdateContacts::Clear(barrier) => {
345 self.contacts.clear();
346 self.incoming_contact_requests.clear();
347 self.outgoing_contact_requests.clear();
348 drop(barrier);
349 Task::ready(Ok(()))
350 }
351 UpdateContacts::Update(message) => {
352 let mut user_ids = HashSet::default();
353 for contact in &message.contacts {
354 user_ids.insert(contact.user_id);
355 }
356 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
357 user_ids.extend(message.outgoing_requests.iter());
358
359 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
360 cx.spawn(async move |this, cx| {
361 load_users.await?;
362
363 // Users are fetched in parallel above and cached in call to get_users
364 // No need to parallelize here
365 let mut updated_contacts = Vec::new();
366 let this = this.upgrade().context("can't upgrade user store handle")?;
367 for contact in message.contacts {
368 updated_contacts
369 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
370 }
371
372 let mut incoming_requests = Vec::new();
373 for request in message.incoming_requests {
374 incoming_requests.push({
375 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
376 .await?
377 });
378 }
379
380 let mut outgoing_requests = Vec::new();
381 for requested_user_id in message.outgoing_requests {
382 outgoing_requests.push(
383 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
384 .await?,
385 );
386 }
387
388 let removed_contacts =
389 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
390 let removed_incoming_requests =
391 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
392 let removed_outgoing_requests =
393 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
394
395 this.update(cx, |this, cx| {
396 // Remove contacts
397 this.contacts
398 .retain(|contact| !removed_contacts.contains(&contact.user.id));
399 // Update existing contacts and insert new ones
400 for updated_contact in updated_contacts {
401 match this.contacts.binary_search_by_key(
402 &&updated_contact.user.github_login,
403 |contact| &contact.user.github_login,
404 ) {
405 Ok(ix) => this.contacts[ix] = updated_contact,
406 Err(ix) => this.contacts.insert(ix, updated_contact),
407 }
408 }
409
410 // Remove incoming contact requests
411 this.incoming_contact_requests.retain(|user| {
412 if removed_incoming_requests.contains(&user.id) {
413 cx.emit(Event::Contact {
414 user: user.clone(),
415 kind: ContactEventKind::Cancelled,
416 });
417 false
418 } else {
419 true
420 }
421 });
422 // Update existing incoming requests and insert new ones
423 for user in incoming_requests {
424 match this
425 .incoming_contact_requests
426 .binary_search_by_key(&&user.github_login, |contact| {
427 &contact.github_login
428 }) {
429 Ok(ix) => this.incoming_contact_requests[ix] = user,
430 Err(ix) => this.incoming_contact_requests.insert(ix, user),
431 }
432 }
433
434 // Remove outgoing contact requests
435 this.outgoing_contact_requests
436 .retain(|user| !removed_outgoing_requests.contains(&user.id));
437 // Update existing incoming requests and insert new ones
438 for request in outgoing_requests {
439 match this
440 .outgoing_contact_requests
441 .binary_search_by_key(&&request.github_login, |contact| {
442 &contact.github_login
443 }) {
444 Ok(ix) => this.outgoing_contact_requests[ix] = request,
445 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
446 }
447 }
448
449 cx.notify();
450 });
451
452 Ok(())
453 })
454 }
455 }
456 }
457
458 pub fn contacts(&self) -> &[Arc<Contact>] {
459 &self.contacts
460 }
461
462 pub fn has_contact(&self, user: &Arc<User>) -> bool {
463 self.contacts
464 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
465 .is_ok()
466 }
467
468 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
469 &self.incoming_contact_requests
470 }
471
472 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
473 &self.outgoing_contact_requests
474 }
475
476 pub fn is_contact_request_pending(&self, user: &User) -> bool {
477 self.pending_contact_requests.contains_key(&user.id)
478 }
479
480 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
481 if self
482 .contacts
483 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
484 .is_ok()
485 {
486 ContactRequestStatus::RequestAccepted
487 } else if self
488 .outgoing_contact_requests
489 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
490 .is_ok()
491 {
492 ContactRequestStatus::RequestSent
493 } else if self
494 .incoming_contact_requests
495 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
496 .is_ok()
497 {
498 ContactRequestStatus::RequestReceived
499 } else {
500 ContactRequestStatus::None
501 }
502 }
503
504 pub fn request_contact(
505 &mut self,
506 responder_id: u64,
507 cx: &mut Context<Self>,
508 ) -> Task<Result<()>> {
509 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
510 }
511
512 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
513 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
514 }
515
516 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
517 self.incoming_contact_requests
518 .iter()
519 .any(|user| user.id == user_id)
520 }
521
522 pub fn respond_to_contact_request(
523 &mut self,
524 requester_id: u64,
525 accept: bool,
526 cx: &mut Context<Self>,
527 ) -> Task<Result<()>> {
528 self.perform_contact_request(
529 requester_id,
530 proto::RespondToContactRequest {
531 requester_id,
532 response: if accept {
533 proto::ContactRequestResponse::Accept
534 } else {
535 proto::ContactRequestResponse::Decline
536 } as i32,
537 },
538 cx,
539 )
540 }
541
542 pub fn dismiss_contact_request(
543 &self,
544 requester_id: u64,
545 cx: &Context<Self>,
546 ) -> Task<Result<()>> {
547 let client = self.client.upgrade();
548 cx.spawn(async move |_, _| {
549 client
550 .context("can't upgrade client reference")?
551 .request(proto::RespondToContactRequest {
552 requester_id,
553 response: proto::ContactRequestResponse::Dismiss as i32,
554 })
555 .await?;
556 Ok(())
557 })
558 }
559
560 fn perform_contact_request<T: RequestMessage>(
561 &mut self,
562 user_id: u64,
563 request: T,
564 cx: &mut Context<Self>,
565 ) -> Task<Result<()>> {
566 let client = self.client.upgrade();
567 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
568 cx.notify();
569
570 cx.spawn(async move |this, cx| {
571 let response = client
572 .context("can't upgrade client reference")?
573 .request(request)
574 .await;
575 this.update(cx, |this, cx| {
576 if let Entry::Occupied(mut request_count) =
577 this.pending_contact_requests.entry(user_id)
578 {
579 *request_count.get_mut() -= 1;
580 if *request_count.get() == 0 {
581 request_count.remove();
582 }
583 }
584 cx.notify();
585 })?;
586 response?;
587 Ok(())
588 })
589 }
590
591 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
592 let (tx, mut rx) = postage::barrier::channel();
593 self.update_contacts_tx
594 .unbounded_send(UpdateContacts::Clear(tx))
595 .unwrap();
596 async move {
597 rx.next().await;
598 }
599 }
600
601 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
602 let (tx, mut rx) = postage::barrier::channel();
603 self.update_contacts_tx
604 .unbounded_send(UpdateContacts::Wait(tx))
605 .unwrap();
606 async move {
607 rx.next().await;
608 }
609 }
610
611 pub fn get_users(
612 &self,
613 user_ids: Vec<u64>,
614 cx: &Context<Self>,
615 ) -> Task<Result<Vec<Arc<User>>>> {
616 let mut user_ids_to_fetch = user_ids.clone();
617 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
618
619 cx.spawn(async move |this, cx| {
620 if !user_ids_to_fetch.is_empty() {
621 this.update(cx, |this, cx| {
622 this.load_users(
623 proto::GetUsers {
624 user_ids: user_ids_to_fetch,
625 },
626 cx,
627 )
628 })?
629 .await?;
630 }
631
632 this.read_with(cx, |this, _| {
633 user_ids
634 .iter()
635 .map(|user_id| {
636 this.users
637 .get(user_id)
638 .cloned()
639 .with_context(|| format!("user {user_id} not found"))
640 })
641 .collect()
642 })?
643 })
644 }
645
646 pub fn fuzzy_search_users(
647 &self,
648 query: String,
649 cx: &Context<Self>,
650 ) -> Task<Result<Vec<Arc<User>>>> {
651 self.load_users(proto::FuzzySearchUsers { query }, cx)
652 }
653
654 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
655 self.users.get(&user_id).cloned()
656 }
657
658 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
659 if let Some(user) = self.users.get(&user_id).cloned() {
660 return Some(user);
661 }
662
663 self.get_user(user_id, cx).detach_and_log_err(cx);
664 None
665 }
666
667 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
668 if let Some(user) = self.users.get(&user_id).cloned() {
669 return Task::ready(Ok(user));
670 }
671
672 let load_users = self.get_users(vec![user_id], cx);
673 cx.spawn(async move |this, cx| {
674 load_users.await?;
675 this.read_with(cx, |this, _| {
676 this.users
677 .get(&user_id)
678 .cloned()
679 .context("server responded with no users")
680 })?
681 })
682 }
683
684 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
685 self.by_github_login
686 .get(github_login)
687 .and_then(|id| self.users.get(id).cloned())
688 }
689
690 pub fn current_user(&self) -> Option<Arc<User>> {
691 self.current_user.borrow().clone()
692 }
693
694 pub fn current_organization(&self) -> Option<Arc<Organization>> {
695 self.current_organization.clone()
696 }
697
698 pub fn set_current_organization(
699 &mut self,
700 organization: Arc<Organization>,
701 cx: &mut Context<Self>,
702 ) {
703 let is_same_organization = self
704 .current_organization
705 .as_ref()
706 .is_some_and(|current| current.id == organization.id);
707
708 if !is_same_organization {
709 self.current_organization.replace(organization);
710 cx.emit(Event::OrganizationChanged);
711 cx.notify();
712 }
713 }
714
715 pub fn organizations(&self) -> &Vec<Arc<Organization>> {
716 &self.organizations
717 }
718
719 pub fn plan_for_organization(&self, organization_id: &OrganizationId) -> Option<Plan> {
720 self.plans_by_organization.get(organization_id).copied()
721 }
722
723 pub fn plan(&self) -> Option<Plan> {
724 #[cfg(debug_assertions)]
725 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
726 use cloud_api_client::Plan;
727
728 return match plan.as_str() {
729 "free" => Some(Plan::ZedFree),
730 "trial" => Some(Plan::ZedProTrial),
731 "pro" => Some(Plan::ZedPro),
732 _ => {
733 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
734 }
735 };
736 }
737
738 if let Some(organization) = &self.current_organization
739 && let Some(plan) = self.plan_for_organization(&organization.id)
740 {
741 return Some(plan);
742 }
743
744 self.plan_info.as_ref().map(|info| info.plan())
745 }
746
747 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
748 self.plan_info
749 .as_ref()
750 .and_then(|plan| plan.subscription_period)
751 .map(|subscription_period| {
752 (
753 subscription_period.started_at.0,
754 subscription_period.ended_at.0,
755 )
756 })
757 }
758
759 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
760 self.plan_info
761 .as_ref()
762 .and_then(|plan| plan.trial_started_at)
763 .map(|trial_started_at| trial_started_at.0)
764 }
765
766 /// Returns whether the user's account is too new to use the service.
767 pub fn account_too_young(&self) -> bool {
768 self.plan_info
769 .as_ref()
770 .map(|plan| plan.is_account_too_young)
771 .unwrap_or_default()
772 }
773
774 /// Returns whether the current user has overdue invoices and usage should be blocked.
775 pub fn has_overdue_invoices(&self) -> bool {
776 self.plan_info
777 .as_ref()
778 .map(|plan| plan.has_overdue_invoices)
779 .unwrap_or_default()
780 }
781
782 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
783 self.edit_prediction_usage
784 }
785
786 pub fn update_edit_prediction_usage(
787 &mut self,
788 usage: EditPredictionUsage,
789 cx: &mut Context<Self>,
790 ) {
791 self.edit_prediction_usage = Some(usage);
792 cx.notify();
793 }
794
795 pub fn clear_organizations(&mut self) {
796 self.organizations.clear();
797 self.current_organization = None;
798 }
799
800 pub fn clear_plan_and_usage(&mut self) {
801 self.plan_info = None;
802 self.edit_prediction_usage = None;
803 }
804
805 fn update_authenticated_user(
806 &mut self,
807 response: GetAuthenticatedUserResponse,
808 cx: &mut Context<Self>,
809 ) {
810 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
811 cx.update_flags(staff, response.feature_flags);
812 if let Some(client) = self.client.upgrade() {
813 client
814 .telemetry
815 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
816 }
817
818 self.organizations = response.organizations.into_iter().map(Arc::new).collect();
819 self.current_organization = self.organizations.first().cloned();
820
821 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
822 limit: response.plan.usage.edit_predictions.limit,
823 amount: response.plan.usage.edit_predictions.used as i32,
824 }));
825 self.plan_info = Some(response.plan);
826 cx.emit(Event::PrivateUserInfoUpdated);
827 }
828
829 fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
830 cx.spawn(async move |cx| {
831 match message {
832 MessageToClient::UserUpdated => {
833 let cloud_client = cx
834 .update(|cx| {
835 this.read_with(cx, |this, _cx| {
836 this.client.upgrade().map(|client| client.cloud_client())
837 })
838 })?
839 .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
840
841 let response = cloud_client.get_authenticated_user().await?;
842 cx.update(|cx| {
843 this.update(cx, |this, cx| {
844 this.update_authenticated_user(response, cx);
845 })
846 })?;
847 }
848 }
849
850 anyhow::Ok(())
851 })
852 .detach_and_log_err(cx);
853 }
854
855 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
856 self.current_user.clone()
857 }
858
859 fn load_users(
860 &self,
861 request: impl RequestMessage<Response = UsersResponse>,
862 cx: &Context<Self>,
863 ) -> Task<Result<Vec<Arc<User>>>> {
864 let client = self.client.clone();
865 cx.spawn(async move |this, cx| {
866 if let Some(rpc) = client.upgrade() {
867 let response = rpc.request(request).await.context("error loading users")?;
868 let users = response.users;
869
870 this.update(cx, |this, _| this.insert(users))
871 } else {
872 Ok(Vec::new())
873 }
874 })
875 }
876
877 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
878 let mut ret = Vec::with_capacity(users.len());
879 for user in users {
880 let user = User::new(user);
881 if let Some(old) = self.users.insert(user.id, user.clone())
882 && old.github_login != user.github_login
883 {
884 self.by_github_login.remove(&old.github_login);
885 }
886 self.by_github_login
887 .insert(user.github_login.clone(), user.id);
888 ret.push(user)
889 }
890 ret
891 }
892
893 pub fn set_participant_indices(
894 &mut self,
895 participant_indices: HashMap<u64, ParticipantIndex>,
896 cx: &mut Context<Self>,
897 ) {
898 if participant_indices != self.participant_indices {
899 self.participant_indices = participant_indices;
900 cx.emit(Event::ParticipantIndicesChanged);
901 }
902 }
903
904 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
905 &self.participant_indices
906 }
907
908 pub fn participant_names(
909 &self,
910 user_ids: impl Iterator<Item = u64>,
911 cx: &App,
912 ) -> HashMap<u64, SharedString> {
913 let mut ret = HashMap::default();
914 let mut missing_user_ids = Vec::new();
915 for id in user_ids {
916 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
917 ret.insert(id, github_login);
918 } else {
919 missing_user_ids.push(id)
920 }
921 }
922 if !missing_user_ids.is_empty() {
923 let this = self.weak_self.clone();
924 cx.spawn(async move |cx| {
925 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
926 .await
927 })
928 .detach_and_log_err(cx);
929 }
930 ret
931 }
932}
933
934impl User {
935 fn new(message: proto::User) -> Arc<Self> {
936 Arc::new(User {
937 id: message.id,
938 github_login: message.github_login.into(),
939 avatar_uri: message.avatar_url.into(),
940 name: message.name,
941 })
942 }
943}
944
945impl Contact {
946 async fn from_proto(
947 contact: proto::Contact,
948 user_store: &Entity<UserStore>,
949 cx: &mut AsyncApp,
950 ) -> Result<Self> {
951 let user = user_store
952 .update(cx, |user_store, cx| {
953 user_store.get_user(contact.user_id, cx)
954 })
955 .await?;
956 Ok(Self {
957 user,
958 online: contact.online,
959 busy: contact.busy,
960 })
961 }
962}
963
964impl Collaborator {
965 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
966 Ok(Self {
967 peer_id: message.peer_id.context("invalid peer id")?,
968 replica_id: ReplicaId::new(message.replica_id as u16),
969 user_id: message.user_id as UserId,
970 is_host: message.is_host,
971 committer_name: message.committer_name,
972 committer_email: message.committer_email,
973 })
974 }
975}
976
977impl RequestUsage {
978 pub fn over_limit(&self) -> bool {
979 match self.limit {
980 UsageLimit::Limited(limit) => self.amount >= limit,
981 UsageLimit::Unlimited => false,
982 }
983 }
984
985 fn from_headers(
986 limit_name: &str,
987 amount_name: &str,
988 headers: &HeaderMap<HeaderValue>,
989 ) -> Result<Self> {
990 let limit = headers
991 .get(limit_name)
992 .with_context(|| format!("missing {limit_name:?} header"))?;
993 let limit = UsageLimit::from_str(limit.to_str()?)?;
994
995 let amount = headers
996 .get(amount_name)
997 .with_context(|| format!("missing {amount_name:?} header"))?;
998 let amount = amount.to_str()?.parse::<i32>()?;
999
1000 Ok(Self { limit, amount })
1001 }
1002}
1003
1004impl EditPredictionUsage {
1005 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1006 Ok(Self(RequestUsage::from_headers(
1007 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1008 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1009 headers,
1010 )?))
1011 }
1012}