1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result};
3use chrono::{DateTime, Utc};
4use cloud_api_client::websocket_protocol::MessageToClient;
5use cloud_api_client::{
6 GetAuthenticatedUserResponse, KnownOrUnknown, Organization, OrganizationId, Plan, PlanInfo,
7};
8use cloud_api_types::OrganizationConfiguration;
9use cloud_llm_client::{
10 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
11};
12use collections::{HashMap, HashSet, hash_map::Entry};
13use db::kvp::KeyValueStore;
14use derive_more::Deref;
15use feature_flags::FeatureFlagAppExt;
16use futures::{Future, StreamExt, channel::mpsc};
17use gpui::{
18 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, TaskExt,
19 WeakEntity,
20};
21use http_client::http::{HeaderMap, HeaderValue};
22use postage::{sink::Sink, watch};
23use rpc::proto::{RequestMessage, UsersResponse};
24use std::{
25 str::FromStr as _,
26 sync::{Arc, Weak},
27};
28use text::ReplicaId;
29use util::{ResultExt, TryFutureExt as _};
30
31const CURRENT_ORGANIZATION_ID_KEY: &str = "current_organization_id";
32
33pub type UserId = u64;
34
35#[derive(
36 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
37)]
38pub struct ChannelId(pub u64);
39
40impl std::fmt::Display for ChannelId {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 self.0.fmt(f)
43 }
44}
45
46#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
47pub struct ProjectId(pub u64);
48
49impl ProjectId {
50 pub fn to_proto(self) -> u64 {
51 self.0
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub struct ParticipantIndex(pub u32);
57
58#[derive(Default, Debug)]
59pub struct User {
60 pub id: UserId,
61 pub github_login: SharedString,
62 pub avatar_uri: SharedUri,
63 pub name: Option<String>,
64}
65
66#[derive(Clone, Debug, PartialEq, Eq)]
67pub struct Collaborator {
68 pub peer_id: proto::PeerId,
69 pub replica_id: ReplicaId,
70 pub user_id: UserId,
71 pub is_host: bool,
72 pub committer_name: Option<String>,
73 pub committer_email: Option<String>,
74}
75
76impl PartialOrd for User {
77 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
78 Some(self.cmp(other))
79 }
80}
81
82impl Ord for User {
83 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
84 self.github_login.cmp(&other.github_login)
85 }
86}
87
88impl PartialEq for User {
89 fn eq(&self, other: &Self) -> bool {
90 self.id == other.id && self.github_login == other.github_login
91 }
92}
93
94impl Eq for User {}
95
96#[derive(Debug, PartialEq)]
97pub struct Contact {
98 pub user: Arc<User>,
99 pub online: bool,
100 pub busy: bool,
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum ContactRequestStatus {
105 None,
106 RequestSent,
107 RequestReceived,
108 RequestAccepted,
109}
110
111pub struct UserStore {
112 users: HashMap<u64, Arc<User>>,
113 by_github_login: HashMap<SharedString, u64>,
114 participant_indices: HashMap<u64, ParticipantIndex>,
115 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
116 edit_prediction_usage: Option<EditPredictionUsage>,
117 plan_info: Option<PlanInfo>,
118 current_user: watch::Receiver<Option<Arc<User>>>,
119 current_organization: Option<Arc<Organization>>,
120 organizations: Vec<Arc<Organization>>,
121 plans_by_organization: HashMap<OrganizationId, Plan>,
122 configuration_by_organization: HashMap<OrganizationId, OrganizationConfiguration>,
123 contacts: Vec<Arc<Contact>>,
124 incoming_contact_requests: Vec<Arc<User>>,
125 outgoing_contact_requests: Vec<Arc<User>>,
126 pending_contact_requests: HashMap<u64, usize>,
127 client: Weak<Client>,
128 _maintain_contacts: Task<()>,
129 _maintain_current_user: Task<Result<()>>,
130 _handle_sign_out: Task<()>,
131 weak_self: WeakEntity<Self>,
132}
133
134#[derive(Clone)]
135pub struct InviteInfo {
136 pub count: u32,
137 pub url: Arc<str>,
138}
139
140pub enum Event {
141 Contact {
142 user: Arc<User>,
143 kind: ContactEventKind,
144 },
145 ShowContacts,
146 ParticipantIndicesChanged,
147 PrivateUserInfoUpdated,
148 PlanUpdated,
149 OrganizationChanged,
150}
151
152#[derive(Clone, Copy)]
153pub enum ContactEventKind {
154 Requested,
155 Accepted,
156 Cancelled,
157}
158
159impl EventEmitter<Event> for UserStore {}
160
161enum UpdateContacts {
162 Update(proto::UpdateContacts),
163 Wait(postage::barrier::Sender),
164 Clear(postage::barrier::Sender),
165}
166
167#[derive(Debug, Clone, Copy, Deref)]
168pub struct EditPredictionUsage(pub RequestUsage);
169
170#[derive(Debug, Clone, Copy)]
171pub struct RequestUsage {
172 pub limit: UsageLimit,
173 pub amount: i32,
174}
175
176impl UserStore {
177 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
178 let (mut current_user_tx, current_user_rx) = watch::channel();
179 let (sign_out_tx, mut sign_out_rx) = mpsc::unbounded();
180 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
181 let rpc_subscriptions = vec![
182 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
183 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
184 ];
185
186 client.sign_out_tx.lock().replace(sign_out_tx);
187 client.add_message_to_client_handler({
188 let this = cx.weak_entity();
189 move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
190 });
191
192 Self {
193 users: Default::default(),
194 by_github_login: Default::default(),
195 current_user: current_user_rx,
196 current_organization: None,
197 organizations: Vec::new(),
198 plans_by_organization: HashMap::default(),
199 configuration_by_organization: HashMap::default(),
200 plan_info: None,
201 edit_prediction_usage: None,
202 contacts: Default::default(),
203 incoming_contact_requests: Default::default(),
204 participant_indices: Default::default(),
205 outgoing_contact_requests: Default::default(),
206 client: Arc::downgrade(&client),
207 update_contacts_tx,
208 _maintain_contacts: cx.spawn(async move |this, cx| {
209 let _subscriptions = rpc_subscriptions;
210 while let Some(message) = update_contacts_rx.next().await {
211 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
212 {
213 task.log_err().await;
214 } else {
215 break;
216 }
217 }
218 }),
219 _maintain_current_user: cx.spawn(async move |this, cx| {
220 let mut status = client.status();
221 let weak = Arc::downgrade(&client);
222 drop(client);
223 while let Some(status) = status.next().await {
224 // if the client is dropped, the app is shutting down.
225 let Some(client) = weak.upgrade() else {
226 return Ok(());
227 };
228 match status {
229 Status::Authenticated
230 | Status::Reauthenticated
231 | Status::Connected { .. } => {
232 if let Some(user_id) = client.user_id() {
233 let response = client
234 .cloud_client()
235 .get_authenticated_user()
236 .await
237 .log_err();
238
239 let current_user_and_response = if let Some(response) = response {
240 let user = Arc::new(User {
241 id: user_id,
242 github_login: response.user.github_login.clone().into(),
243 avatar_uri: response.user.avatar_url.clone().into(),
244 name: response.user.name.clone(),
245 });
246
247 Some((user, response))
248 } else {
249 None
250 };
251 current_user_tx
252 .send(
253 current_user_and_response
254 .as_ref()
255 .map(|(user, _)| user.clone()),
256 )
257 .await
258 .ok();
259
260 cx.update(|cx| {
261 if let Some((user, response)) = current_user_and_response {
262 this.update(cx, |this, cx| {
263 this.by_github_login
264 .insert(user.github_login.clone(), user_id);
265 this.users.insert(user_id, user);
266 this.update_authenticated_user(response, cx)
267 })
268 } else {
269 anyhow::Ok(())
270 }
271 })?;
272
273 this.update(cx, |_, cx| cx.notify())?;
274 }
275 }
276 Status::SignedOut => {
277 current_user_tx.send(None).await.ok();
278 this.update(cx, |this, cx| {
279 this.clear_organizations();
280 this.clear_plan_and_usage();
281 cx.emit(Event::PrivateUserInfoUpdated);
282 cx.notify();
283 this.clear_contacts()
284 })?
285 .await;
286 }
287 Status::ConnectionLost => {
288 this.update(cx, |this, cx| {
289 cx.notify();
290 this.clear_contacts()
291 })?
292 .await;
293 }
294 _ => {}
295 }
296 }
297 Ok(())
298 }),
299 _handle_sign_out: cx.spawn(async move |this, cx| {
300 while let Some(()) = sign_out_rx.next().await {
301 let Some(client) = this
302 .read_with(cx, |this, _cx| this.client.upgrade())
303 .ok()
304 .flatten()
305 else {
306 break;
307 };
308
309 client.sign_out(cx).await;
310 }
311 }),
312 pending_contact_requests: Default::default(),
313 weak_self: cx.weak_entity(),
314 }
315 }
316
317 #[cfg(feature = "test-support")]
318 pub fn clear_cache(&mut self) {
319 self.users.clear();
320 self.by_github_login.clear();
321 }
322
323 async fn handle_show_contacts(
324 this: Entity<Self>,
325 _: TypedEnvelope<proto::ShowContacts>,
326 mut cx: AsyncApp,
327 ) -> Result<()> {
328 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
329 Ok(())
330 }
331
332 async fn handle_update_contacts(
333 this: Entity<Self>,
334 message: TypedEnvelope<proto::UpdateContacts>,
335 cx: AsyncApp,
336 ) -> Result<()> {
337 this.read_with(&cx, |this, _| {
338 this.update_contacts_tx
339 .unbounded_send(UpdateContacts::Update(message.payload))
340 .unwrap();
341 });
342 Ok(())
343 }
344
345 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
346 match message {
347 UpdateContacts::Wait(barrier) => {
348 drop(barrier);
349 Task::ready(Ok(()))
350 }
351 UpdateContacts::Clear(barrier) => {
352 self.contacts.clear();
353 self.incoming_contact_requests.clear();
354 self.outgoing_contact_requests.clear();
355 drop(barrier);
356 Task::ready(Ok(()))
357 }
358 UpdateContacts::Update(message) => {
359 let mut user_ids = HashSet::default();
360 for contact in &message.contacts {
361 user_ids.insert(contact.user_id);
362 }
363 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
364 user_ids.extend(message.outgoing_requests.iter());
365
366 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
367 cx.spawn(async move |this, cx| {
368 load_users.await?;
369
370 // Users are fetched in parallel above and cached in call to get_users
371 // No need to parallelize here
372 let mut updated_contacts = Vec::new();
373 let this = this.upgrade().context("can't upgrade user store handle")?;
374 for contact in message.contacts {
375 updated_contacts
376 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
377 }
378
379 let mut incoming_requests = Vec::new();
380 for request in message.incoming_requests {
381 incoming_requests.push({
382 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
383 .await?
384 });
385 }
386
387 let mut outgoing_requests = Vec::new();
388 for requested_user_id in message.outgoing_requests {
389 outgoing_requests.push(
390 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
391 .await?,
392 );
393 }
394
395 let removed_contacts =
396 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
397 let removed_incoming_requests =
398 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
399 let removed_outgoing_requests =
400 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
401
402 this.update(cx, |this, cx| {
403 // Remove contacts
404 this.contacts
405 .retain(|contact| !removed_contacts.contains(&contact.user.id));
406 // Update existing contacts and insert new ones
407 for updated_contact in updated_contacts {
408 match this.contacts.binary_search_by_key(
409 &&updated_contact.user.github_login,
410 |contact| &contact.user.github_login,
411 ) {
412 Ok(ix) => this.contacts[ix] = updated_contact,
413 Err(ix) => this.contacts.insert(ix, updated_contact),
414 }
415 }
416
417 // Remove incoming contact requests
418 this.incoming_contact_requests.retain(|user| {
419 if removed_incoming_requests.contains(&user.id) {
420 cx.emit(Event::Contact {
421 user: user.clone(),
422 kind: ContactEventKind::Cancelled,
423 });
424 false
425 } else {
426 true
427 }
428 });
429 // Update existing incoming requests and insert new ones
430 for user in incoming_requests {
431 match this
432 .incoming_contact_requests
433 .binary_search_by_key(&&user.github_login, |contact| {
434 &contact.github_login
435 }) {
436 Ok(ix) => this.incoming_contact_requests[ix] = user,
437 Err(ix) => this.incoming_contact_requests.insert(ix, user),
438 }
439 }
440
441 // Remove outgoing contact requests
442 this.outgoing_contact_requests
443 .retain(|user| !removed_outgoing_requests.contains(&user.id));
444 // Update existing incoming requests and insert new ones
445 for request in outgoing_requests {
446 match this
447 .outgoing_contact_requests
448 .binary_search_by_key(&&request.github_login, |contact| {
449 &contact.github_login
450 }) {
451 Ok(ix) => this.outgoing_contact_requests[ix] = request,
452 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
453 }
454 }
455
456 cx.notify();
457 });
458
459 Ok(())
460 })
461 }
462 }
463 }
464
465 pub fn contacts(&self) -> &[Arc<Contact>] {
466 &self.contacts
467 }
468
469 pub fn has_contact(&self, user: &Arc<User>) -> bool {
470 self.contacts
471 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
472 .is_ok()
473 }
474
475 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
476 &self.incoming_contact_requests
477 }
478
479 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
480 &self.outgoing_contact_requests
481 }
482
483 pub fn is_contact_request_pending(&self, user: &User) -> bool {
484 self.pending_contact_requests.contains_key(&user.id)
485 }
486
487 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
488 if self
489 .contacts
490 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
491 .is_ok()
492 {
493 ContactRequestStatus::RequestAccepted
494 } else if self
495 .outgoing_contact_requests
496 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
497 .is_ok()
498 {
499 ContactRequestStatus::RequestSent
500 } else if self
501 .incoming_contact_requests
502 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
503 .is_ok()
504 {
505 ContactRequestStatus::RequestReceived
506 } else {
507 ContactRequestStatus::None
508 }
509 }
510
511 pub fn request_contact(
512 &mut self,
513 responder_id: u64,
514 cx: &mut Context<Self>,
515 ) -> Task<Result<()>> {
516 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
517 }
518
519 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
520 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
521 }
522
523 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
524 self.incoming_contact_requests
525 .iter()
526 .any(|user| user.id == user_id)
527 }
528
529 pub fn respond_to_contact_request(
530 &mut self,
531 requester_id: u64,
532 accept: bool,
533 cx: &mut Context<Self>,
534 ) -> Task<Result<()>> {
535 self.perform_contact_request(
536 requester_id,
537 proto::RespondToContactRequest {
538 requester_id,
539 response: if accept {
540 proto::ContactRequestResponse::Accept
541 } else {
542 proto::ContactRequestResponse::Decline
543 } as i32,
544 },
545 cx,
546 )
547 }
548
549 pub fn dismiss_contact_request(
550 &self,
551 requester_id: u64,
552 cx: &Context<Self>,
553 ) -> Task<Result<()>> {
554 let client = self.client.upgrade();
555 cx.spawn(async move |_, _| {
556 client
557 .context("can't upgrade client reference")?
558 .request(proto::RespondToContactRequest {
559 requester_id,
560 response: proto::ContactRequestResponse::Dismiss as i32,
561 })
562 .await?;
563 Ok(())
564 })
565 }
566
567 fn perform_contact_request<T: RequestMessage>(
568 &mut self,
569 user_id: u64,
570 request: T,
571 cx: &mut Context<Self>,
572 ) -> Task<Result<()>> {
573 let client = self.client.upgrade();
574 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
575 cx.notify();
576
577 cx.spawn(async move |this, cx| {
578 let response = client
579 .context("can't upgrade client reference")?
580 .request(request)
581 .await;
582 this.update(cx, |this, cx| {
583 if let Entry::Occupied(mut request_count) =
584 this.pending_contact_requests.entry(user_id)
585 {
586 *request_count.get_mut() -= 1;
587 if *request_count.get() == 0 {
588 request_count.remove();
589 }
590 }
591 cx.notify();
592 })?;
593 response?;
594 Ok(())
595 })
596 }
597
598 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
599 let (tx, mut rx) = postage::barrier::channel();
600 self.update_contacts_tx
601 .unbounded_send(UpdateContacts::Clear(tx))
602 .unwrap();
603 async move {
604 rx.next().await;
605 }
606 }
607
608 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
609 let (tx, mut rx) = postage::barrier::channel();
610 self.update_contacts_tx
611 .unbounded_send(UpdateContacts::Wait(tx))
612 .unwrap();
613 async move {
614 rx.next().await;
615 }
616 }
617
618 pub fn get_users(
619 &self,
620 user_ids: Vec<u64>,
621 cx: &Context<Self>,
622 ) -> Task<Result<Vec<Arc<User>>>> {
623 let mut user_ids_to_fetch = user_ids.clone();
624 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
625
626 cx.spawn(async move |this, cx| {
627 if !user_ids_to_fetch.is_empty() {
628 this.update(cx, |this, cx| {
629 this.load_users(
630 proto::GetUsers {
631 user_ids: user_ids_to_fetch,
632 },
633 cx,
634 )
635 })?
636 .await?;
637 }
638
639 this.read_with(cx, |this, _| {
640 user_ids
641 .iter()
642 .map(|user_id| {
643 this.users
644 .get(user_id)
645 .cloned()
646 .with_context(|| format!("user {user_id} not found"))
647 })
648 .collect()
649 })?
650 })
651 }
652
653 pub fn fuzzy_search_users(
654 &self,
655 query: String,
656 cx: &Context<Self>,
657 ) -> Task<Result<Vec<Arc<User>>>> {
658 self.load_users(proto::FuzzySearchUsers { query }, cx)
659 }
660
661 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
662 self.users.get(&user_id).cloned()
663 }
664
665 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
666 if let Some(user) = self.users.get(&user_id).cloned() {
667 return Some(user);
668 }
669
670 self.get_user(user_id, cx).detach_and_log_err(cx);
671 None
672 }
673
674 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
675 if let Some(user) = self.users.get(&user_id).cloned() {
676 return Task::ready(Ok(user));
677 }
678
679 let load_users = self.get_users(vec![user_id], cx);
680 cx.spawn(async move |this, cx| {
681 load_users.await?;
682 this.read_with(cx, |this, _| {
683 this.users
684 .get(&user_id)
685 .cloned()
686 .context("server responded with no users")
687 })?
688 })
689 }
690
691 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
692 self.by_github_login
693 .get(github_login)
694 .and_then(|id| self.users.get(id).cloned())
695 }
696
697 pub fn current_user(&self) -> Option<Arc<User>> {
698 self.current_user.borrow().clone()
699 }
700
701 pub fn current_organization(&self) -> Option<Arc<Organization>> {
702 self.current_organization.clone()
703 }
704
705 pub fn set_current_organization(
706 &mut self,
707 organization: Arc<Organization>,
708 cx: &mut Context<Self>,
709 ) {
710 let is_same_organization = self
711 .current_organization
712 .as_ref()
713 .is_some_and(|current| current.id == organization.id);
714
715 if !is_same_organization {
716 let organization_id = organization.id.0.to_string();
717 self.current_organization.replace(organization);
718 cx.emit(Event::OrganizationChanged);
719 cx.notify();
720
721 let kvp = KeyValueStore::global(cx);
722 db::write_and_log(cx, move || async move {
723 kvp.write_kvp(CURRENT_ORGANIZATION_ID_KEY.into(), organization_id)
724 .await
725 });
726 }
727 }
728
729 pub fn organizations(&self) -> &Vec<Arc<Organization>> {
730 &self.organizations
731 }
732
733 pub fn plan_for_organization(&self, organization_id: &OrganizationId) -> Option<Plan> {
734 self.plans_by_organization.get(organization_id).copied()
735 }
736
737 pub fn current_organization_configuration(&self) -> Option<&OrganizationConfiguration> {
738 let current_organization = self.current_organization.as_ref()?;
739
740 self.configuration_by_organization
741 .get(¤t_organization.id)
742 }
743
744 pub fn plan(&self) -> Option<Plan> {
745 #[cfg(debug_assertions)]
746 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
747 use cloud_api_client::Plan;
748
749 return match plan.as_str() {
750 "free" => Some(Plan::ZedFree),
751 "trial" => Some(Plan::ZedProTrial),
752 "pro" => Some(Plan::ZedPro),
753 _ => {
754 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
755 }
756 };
757 }
758
759 if let Some(organization) = &self.current_organization
760 && let Some(plan) = self.plan_for_organization(&organization.id)
761 {
762 return Some(plan);
763 }
764
765 self.plan_info.as_ref().map(|info| info.plan())
766 }
767
768 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
769 self.plan_info
770 .as_ref()
771 .and_then(|plan| plan.subscription_period)
772 .map(|subscription_period| {
773 (
774 subscription_period.started_at.0,
775 subscription_period.ended_at.0,
776 )
777 })
778 }
779
780 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
781 self.plan_info
782 .as_ref()
783 .and_then(|plan| plan.trial_started_at)
784 .map(|trial_started_at| trial_started_at.0)
785 }
786
787 /// Returns whether the user's account is too new to use the service.
788 pub fn account_too_young(&self) -> bool {
789 self.plan_info
790 .as_ref()
791 .map(|plan| plan.is_account_too_young)
792 .unwrap_or_default()
793 }
794
795 /// Returns whether the current user has overdue invoices and usage should be blocked.
796 pub fn has_overdue_invoices(&self) -> bool {
797 self.plan_info
798 .as_ref()
799 .map(|plan| plan.has_overdue_invoices)
800 .unwrap_or_default()
801 }
802
803 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
804 self.edit_prediction_usage
805 }
806
807 pub fn update_edit_prediction_usage(
808 &mut self,
809 usage: EditPredictionUsage,
810 cx: &mut Context<Self>,
811 ) {
812 self.edit_prediction_usage = Some(usage);
813 cx.notify();
814 }
815
816 pub fn clear_organizations(&mut self) {
817 self.organizations.clear();
818 self.current_organization = None;
819 }
820
821 pub fn clear_plan_and_usage(&mut self) {
822 self.plan_info = None;
823 self.edit_prediction_usage = None;
824 }
825
826 fn update_authenticated_user(
827 &mut self,
828 response: GetAuthenticatedUserResponse,
829 cx: &mut Context<Self>,
830 ) {
831 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
832 cx.update_flags(staff, response.feature_flags);
833 if let Some(client) = self.client.upgrade() {
834 client
835 .telemetry
836 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
837 }
838
839 self.organizations = response.organizations.into_iter().map(Arc::new).collect();
840 let persisted_org_id = KeyValueStore::global(cx)
841 .read_kvp(CURRENT_ORGANIZATION_ID_KEY)
842 .log_err()
843 .flatten()
844 .map(|id| OrganizationId(Arc::from(id)));
845
846 self.current_organization = persisted_org_id
847 .and_then(|persisted_id| {
848 self.organizations
849 .iter()
850 .find(|org| org.id == persisted_id)
851 .cloned()
852 })
853 .or_else(|| {
854 response
855 .default_organization_id
856 .and_then(|default_organization_id| {
857 self.organizations
858 .iter()
859 .find(|organization| organization.id == default_organization_id)
860 .cloned()
861 })
862 })
863 .or_else(|| self.organizations.first().cloned());
864 self.plans_by_organization = response
865 .plans_by_organization
866 .into_iter()
867 .map(|(organization_id, plan)| {
868 let plan = match plan {
869 KnownOrUnknown::Known(plan) => plan,
870 KnownOrUnknown::Unknown(_) => {
871 // If we get a plan that we don't recognize, fall back to the Free plan.
872 Plan::ZedFree
873 }
874 };
875
876 (organization_id, plan)
877 })
878 .collect();
879 self.configuration_by_organization =
880 response.configuration_by_organization.into_iter().collect();
881
882 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
883 limit: response.plan.usage.edit_predictions.limit,
884 amount: response.plan.usage.edit_predictions.used as i32,
885 }));
886 self.plan_info = Some(response.plan);
887 cx.emit(Event::PrivateUserInfoUpdated);
888 }
889
890 fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
891 cx.spawn(async move |cx| {
892 match message {
893 MessageToClient::UserUpdated => {
894 let cloud_client = cx
895 .update(|cx| {
896 this.read_with(cx, |this, _cx| {
897 this.client.upgrade().map(|client| client.cloud_client())
898 })
899 })?
900 .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
901
902 let response = cloud_client.get_authenticated_user().await?;
903 cx.update(|cx| {
904 this.update(cx, |this, cx| {
905 this.update_authenticated_user(response, cx);
906 })
907 })?;
908 }
909 }
910
911 anyhow::Ok(())
912 })
913 .detach_and_log_err(cx);
914 }
915
916 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
917 self.current_user.clone()
918 }
919
920 fn load_users(
921 &self,
922 request: impl RequestMessage<Response = UsersResponse>,
923 cx: &Context<Self>,
924 ) -> Task<Result<Vec<Arc<User>>>> {
925 let client = self.client.clone();
926 cx.spawn(async move |this, cx| {
927 if let Some(rpc) = client.upgrade() {
928 let response = rpc.request(request).await.context("error loading users")?;
929 let users = response.users;
930
931 this.update(cx, |this, _| this.insert(users))
932 } else {
933 Ok(Vec::new())
934 }
935 })
936 }
937
938 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
939 let mut ret = Vec::with_capacity(users.len());
940 for user in users {
941 let user = User::new(user);
942 if let Some(old) = self.users.insert(user.id, user.clone())
943 && old.github_login != user.github_login
944 {
945 self.by_github_login.remove(&old.github_login);
946 }
947 self.by_github_login
948 .insert(user.github_login.clone(), user.id);
949 ret.push(user)
950 }
951 ret
952 }
953
954 pub fn set_participant_indices(
955 &mut self,
956 participant_indices: HashMap<u64, ParticipantIndex>,
957 cx: &mut Context<Self>,
958 ) {
959 if participant_indices != self.participant_indices {
960 self.participant_indices = participant_indices;
961 cx.emit(Event::ParticipantIndicesChanged);
962 }
963 }
964
965 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
966 &self.participant_indices
967 }
968
969 pub fn participant_names(
970 &self,
971 user_ids: impl Iterator<Item = u64>,
972 cx: &App,
973 ) -> HashMap<u64, SharedString> {
974 let mut ret = HashMap::default();
975 let mut missing_user_ids = Vec::new();
976 for id in user_ids {
977 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
978 ret.insert(id, github_login);
979 } else {
980 missing_user_ids.push(id)
981 }
982 }
983 if !missing_user_ids.is_empty() {
984 let this = self.weak_self.clone();
985 cx.spawn(async move |cx| {
986 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
987 .await
988 })
989 .detach_and_log_err(cx);
990 }
991 ret
992 }
993}
994
995impl User {
996 fn new(message: proto::User) -> Arc<Self> {
997 Arc::new(User {
998 id: message.id,
999 github_login: message.github_login.into(),
1000 avatar_uri: message.avatar_url.into(),
1001 name: message.name,
1002 })
1003 }
1004}
1005
1006impl Contact {
1007 async fn from_proto(
1008 contact: proto::Contact,
1009 user_store: &Entity<UserStore>,
1010 cx: &mut AsyncApp,
1011 ) -> Result<Self> {
1012 let user = user_store
1013 .update(cx, |user_store, cx| {
1014 user_store.get_user(contact.user_id, cx)
1015 })
1016 .await?;
1017 Ok(Self {
1018 user,
1019 online: contact.online,
1020 busy: contact.busy,
1021 })
1022 }
1023}
1024
1025impl Collaborator {
1026 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
1027 Ok(Self {
1028 peer_id: message.peer_id.context("invalid peer id")?,
1029 replica_id: ReplicaId::new(message.replica_id as u16),
1030 user_id: message.user_id as UserId,
1031 is_host: message.is_host,
1032 committer_name: message.committer_name,
1033 committer_email: message.committer_email,
1034 })
1035 }
1036}
1037
1038impl RequestUsage {
1039 pub fn over_limit(&self) -> bool {
1040 match self.limit {
1041 UsageLimit::Limited(limit) => self.amount >= limit,
1042 UsageLimit::Unlimited => false,
1043 }
1044 }
1045
1046 fn from_headers(
1047 limit_name: &str,
1048 amount_name: &str,
1049 headers: &HeaderMap<HeaderValue>,
1050 ) -> Result<Self> {
1051 let limit = headers
1052 .get(limit_name)
1053 .with_context(|| format!("missing {limit_name:?} header"))?;
1054 let limit = UsageLimit::from_str(limit.to_str()?)?;
1055
1056 let amount = headers
1057 .get(amount_name)
1058 .with_context(|| format!("missing {amount_name:?} header"))?;
1059 let amount = amount.to_str()?.parse::<i32>()?;
1060
1061 Ok(Self { limit, amount })
1062 }
1063}
1064
1065impl EditPredictionUsage {
1066 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1067 Ok(Self(RequestUsage::from_headers(
1068 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1069 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1070 headers,
1071 )?))
1072 }
1073}