1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result};
3use chrono::{DateTime, Utc};
4use cloud_api_client::websocket_protocol::MessageToClient;
5use cloud_api_client::{
6 GetAuthenticatedUserResponse, KnownOrUnknown, Organization, OrganizationId, Plan, PlanInfo,
7};
8use cloud_api_types::OrganizationConfiguration;
9use cloud_llm_client::{
10 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
11};
12use collections::{HashMap, HashSet, hash_map::Entry};
13use db::kvp::KeyValueStore;
14use derive_more::Deref;
15use feature_flags::FeatureFlagAppExt;
16use futures::{Future, StreamExt, channel::mpsc};
17use gpui::{
18 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
19};
20use http_client::http::{HeaderMap, HeaderValue};
21use postage::{sink::Sink, watch};
22use rpc::proto::{RequestMessage, UsersResponse};
23use std::{
24 str::FromStr as _,
25 sync::{Arc, Weak},
26};
27use text::ReplicaId;
28use util::{ResultExt, TryFutureExt as _};
29
30const CURRENT_ORGANIZATION_ID_KEY: &str = "current_organization_id";
31
32pub type UserId = u64;
33
34#[derive(
35 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
36)]
37pub struct ChannelId(pub u64);
38
39impl std::fmt::Display for ChannelId {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 self.0.fmt(f)
42 }
43}
44
45#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
46pub struct ProjectId(pub u64);
47
48impl ProjectId {
49 pub fn to_proto(self) -> u64 {
50 self.0
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub struct ParticipantIndex(pub u32);
56
57#[derive(Default, Debug)]
58pub struct User {
59 pub id: UserId,
60 pub github_login: SharedString,
61 pub avatar_uri: SharedUri,
62 pub name: Option<String>,
63}
64
65#[derive(Clone, Debug, PartialEq, Eq)]
66pub struct Collaborator {
67 pub peer_id: proto::PeerId,
68 pub replica_id: ReplicaId,
69 pub user_id: UserId,
70 pub is_host: bool,
71 pub committer_name: Option<String>,
72 pub committer_email: Option<String>,
73}
74
75impl PartialOrd for User {
76 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
77 Some(self.cmp(other))
78 }
79}
80
81impl Ord for User {
82 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
83 self.github_login.cmp(&other.github_login)
84 }
85}
86
87impl PartialEq for User {
88 fn eq(&self, other: &Self) -> bool {
89 self.id == other.id && self.github_login == other.github_login
90 }
91}
92
93impl Eq for User {}
94
95#[derive(Debug, PartialEq)]
96pub struct Contact {
97 pub user: Arc<User>,
98 pub online: bool,
99 pub busy: bool,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum ContactRequestStatus {
104 None,
105 RequestSent,
106 RequestReceived,
107 RequestAccepted,
108}
109
110pub struct UserStore {
111 users: HashMap<u64, Arc<User>>,
112 by_github_login: HashMap<SharedString, u64>,
113 participant_indices: HashMap<u64, ParticipantIndex>,
114 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
115 edit_prediction_usage: Option<EditPredictionUsage>,
116 plan_info: Option<PlanInfo>,
117 current_user: watch::Receiver<Option<Arc<User>>>,
118 current_organization: Option<Arc<Organization>>,
119 organizations: Vec<Arc<Organization>>,
120 plans_by_organization: HashMap<OrganizationId, Plan>,
121 configuration_by_organization: HashMap<OrganizationId, OrganizationConfiguration>,
122 contacts: Vec<Arc<Contact>>,
123 incoming_contact_requests: Vec<Arc<User>>,
124 outgoing_contact_requests: Vec<Arc<User>>,
125 pending_contact_requests: HashMap<u64, usize>,
126 client: Weak<Client>,
127 _maintain_contacts: Task<()>,
128 _maintain_current_user: Task<Result<()>>,
129 _handle_sign_out: Task<()>,
130 weak_self: WeakEntity<Self>,
131}
132
133#[derive(Clone)]
134pub struct InviteInfo {
135 pub count: u32,
136 pub url: Arc<str>,
137}
138
139pub enum Event {
140 Contact {
141 user: Arc<User>,
142 kind: ContactEventKind,
143 },
144 ShowContacts,
145 ParticipantIndicesChanged,
146 PrivateUserInfoUpdated,
147 PlanUpdated,
148 OrganizationChanged,
149}
150
151#[derive(Clone, Copy)]
152pub enum ContactEventKind {
153 Requested,
154 Accepted,
155 Cancelled,
156}
157
158impl EventEmitter<Event> for UserStore {}
159
160enum UpdateContacts {
161 Update(proto::UpdateContacts),
162 Wait(postage::barrier::Sender),
163 Clear(postage::barrier::Sender),
164}
165
166#[derive(Debug, Clone, Copy, Deref)]
167pub struct EditPredictionUsage(pub RequestUsage);
168
169#[derive(Debug, Clone, Copy)]
170pub struct RequestUsage {
171 pub limit: UsageLimit,
172 pub amount: i32,
173}
174
175impl UserStore {
176 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
177 let (mut current_user_tx, current_user_rx) = watch::channel();
178 let (sign_out_tx, mut sign_out_rx) = mpsc::unbounded();
179 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
180 let rpc_subscriptions = vec![
181 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
182 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
183 ];
184
185 client.sign_out_tx.lock().replace(sign_out_tx);
186 client.add_message_to_client_handler({
187 let this = cx.weak_entity();
188 move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
189 });
190
191 Self {
192 users: Default::default(),
193 by_github_login: Default::default(),
194 current_user: current_user_rx,
195 current_organization: None,
196 organizations: Vec::new(),
197 plans_by_organization: HashMap::default(),
198 configuration_by_organization: HashMap::default(),
199 plan_info: None,
200 edit_prediction_usage: None,
201 contacts: Default::default(),
202 incoming_contact_requests: Default::default(),
203 participant_indices: Default::default(),
204 outgoing_contact_requests: Default::default(),
205 client: Arc::downgrade(&client),
206 update_contacts_tx,
207 _maintain_contacts: cx.spawn(async move |this, cx| {
208 let _subscriptions = rpc_subscriptions;
209 while let Some(message) = update_contacts_rx.next().await {
210 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
211 {
212 task.log_err().await;
213 } else {
214 break;
215 }
216 }
217 }),
218 _maintain_current_user: cx.spawn(async move |this, cx| {
219 let mut status = client.status();
220 let weak = Arc::downgrade(&client);
221 drop(client);
222 while let Some(status) = status.next().await {
223 // if the client is dropped, the app is shutting down.
224 let Some(client) = weak.upgrade() else {
225 return Ok(());
226 };
227 match status {
228 Status::Authenticated
229 | Status::Reauthenticated
230 | Status::Connected { .. } => {
231 if let Some(user_id) = client.user_id() {
232 let response = client
233 .cloud_client()
234 .get_authenticated_user()
235 .await
236 .log_err();
237
238 let current_user_and_response = if let Some(response) = response {
239 let user = Arc::new(User {
240 id: user_id,
241 github_login: response.user.github_login.clone().into(),
242 avatar_uri: response.user.avatar_url.clone().into(),
243 name: response.user.name.clone(),
244 });
245
246 Some((user, response))
247 } else {
248 None
249 };
250 current_user_tx
251 .send(
252 current_user_and_response
253 .as_ref()
254 .map(|(user, _)| user.clone()),
255 )
256 .await
257 .ok();
258
259 cx.update(|cx| {
260 if let Some((user, response)) = current_user_and_response {
261 this.update(cx, |this, cx| {
262 this.by_github_login
263 .insert(user.github_login.clone(), user_id);
264 this.users.insert(user_id, user);
265 this.update_authenticated_user(response, cx)
266 })
267 } else {
268 anyhow::Ok(())
269 }
270 })?;
271
272 this.update(cx, |_, cx| cx.notify())?;
273 }
274 }
275 Status::SignedOut => {
276 current_user_tx.send(None).await.ok();
277 this.update(cx, |this, cx| {
278 this.clear_organizations();
279 this.clear_plan_and_usage();
280 cx.emit(Event::PrivateUserInfoUpdated);
281 cx.notify();
282 this.clear_contacts()
283 })?
284 .await;
285 }
286 Status::ConnectionLost => {
287 this.update(cx, |this, cx| {
288 cx.notify();
289 this.clear_contacts()
290 })?
291 .await;
292 }
293 _ => {}
294 }
295 }
296 Ok(())
297 }),
298 _handle_sign_out: cx.spawn(async move |this, cx| {
299 while let Some(()) = sign_out_rx.next().await {
300 let Some(client) = this
301 .read_with(cx, |this, _cx| this.client.upgrade())
302 .ok()
303 .flatten()
304 else {
305 break;
306 };
307
308 client.sign_out(cx).await;
309 }
310 }),
311 pending_contact_requests: Default::default(),
312 weak_self: cx.weak_entity(),
313 }
314 }
315
316 #[cfg(feature = "test-support")]
317 pub fn clear_cache(&mut self) {
318 self.users.clear();
319 self.by_github_login.clear();
320 }
321
322 async fn handle_show_contacts(
323 this: Entity<Self>,
324 _: TypedEnvelope<proto::ShowContacts>,
325 mut cx: AsyncApp,
326 ) -> Result<()> {
327 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
328 Ok(())
329 }
330
331 async fn handle_update_contacts(
332 this: Entity<Self>,
333 message: TypedEnvelope<proto::UpdateContacts>,
334 cx: AsyncApp,
335 ) -> Result<()> {
336 this.read_with(&cx, |this, _| {
337 this.update_contacts_tx
338 .unbounded_send(UpdateContacts::Update(message.payload))
339 .unwrap();
340 });
341 Ok(())
342 }
343
344 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
345 match message {
346 UpdateContacts::Wait(barrier) => {
347 drop(barrier);
348 Task::ready(Ok(()))
349 }
350 UpdateContacts::Clear(barrier) => {
351 self.contacts.clear();
352 self.incoming_contact_requests.clear();
353 self.outgoing_contact_requests.clear();
354 drop(barrier);
355 Task::ready(Ok(()))
356 }
357 UpdateContacts::Update(message) => {
358 let mut user_ids = HashSet::default();
359 for contact in &message.contacts {
360 user_ids.insert(contact.user_id);
361 }
362 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
363 user_ids.extend(message.outgoing_requests.iter());
364
365 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
366 cx.spawn(async move |this, cx| {
367 load_users.await?;
368
369 // Users are fetched in parallel above and cached in call to get_users
370 // No need to parallelize here
371 let mut updated_contacts = Vec::new();
372 let this = this.upgrade().context("can't upgrade user store handle")?;
373 for contact in message.contacts {
374 updated_contacts
375 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
376 }
377
378 let mut incoming_requests = Vec::new();
379 for request in message.incoming_requests {
380 incoming_requests.push({
381 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
382 .await?
383 });
384 }
385
386 let mut outgoing_requests = Vec::new();
387 for requested_user_id in message.outgoing_requests {
388 outgoing_requests.push(
389 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
390 .await?,
391 );
392 }
393
394 let removed_contacts =
395 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
396 let removed_incoming_requests =
397 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
398 let removed_outgoing_requests =
399 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
400
401 this.update(cx, |this, cx| {
402 // Remove contacts
403 this.contacts
404 .retain(|contact| !removed_contacts.contains(&contact.user.id));
405 // Update existing contacts and insert new ones
406 for updated_contact in updated_contacts {
407 match this.contacts.binary_search_by_key(
408 &&updated_contact.user.github_login,
409 |contact| &contact.user.github_login,
410 ) {
411 Ok(ix) => this.contacts[ix] = updated_contact,
412 Err(ix) => this.contacts.insert(ix, updated_contact),
413 }
414 }
415
416 // Remove incoming contact requests
417 this.incoming_contact_requests.retain(|user| {
418 if removed_incoming_requests.contains(&user.id) {
419 cx.emit(Event::Contact {
420 user: user.clone(),
421 kind: ContactEventKind::Cancelled,
422 });
423 false
424 } else {
425 true
426 }
427 });
428 // Update existing incoming requests and insert new ones
429 for user in incoming_requests {
430 match this
431 .incoming_contact_requests
432 .binary_search_by_key(&&user.github_login, |contact| {
433 &contact.github_login
434 }) {
435 Ok(ix) => this.incoming_contact_requests[ix] = user,
436 Err(ix) => this.incoming_contact_requests.insert(ix, user),
437 }
438 }
439
440 // Remove outgoing contact requests
441 this.outgoing_contact_requests
442 .retain(|user| !removed_outgoing_requests.contains(&user.id));
443 // Update existing incoming requests and insert new ones
444 for request in outgoing_requests {
445 match this
446 .outgoing_contact_requests
447 .binary_search_by_key(&&request.github_login, |contact| {
448 &contact.github_login
449 }) {
450 Ok(ix) => this.outgoing_contact_requests[ix] = request,
451 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
452 }
453 }
454
455 cx.notify();
456 });
457
458 Ok(())
459 })
460 }
461 }
462 }
463
464 pub fn contacts(&self) -> &[Arc<Contact>] {
465 &self.contacts
466 }
467
468 pub fn has_contact(&self, user: &Arc<User>) -> bool {
469 self.contacts
470 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
471 .is_ok()
472 }
473
474 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
475 &self.incoming_contact_requests
476 }
477
478 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
479 &self.outgoing_contact_requests
480 }
481
482 pub fn is_contact_request_pending(&self, user: &User) -> bool {
483 self.pending_contact_requests.contains_key(&user.id)
484 }
485
486 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
487 if self
488 .contacts
489 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
490 .is_ok()
491 {
492 ContactRequestStatus::RequestAccepted
493 } else if self
494 .outgoing_contact_requests
495 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
496 .is_ok()
497 {
498 ContactRequestStatus::RequestSent
499 } else if self
500 .incoming_contact_requests
501 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
502 .is_ok()
503 {
504 ContactRequestStatus::RequestReceived
505 } else {
506 ContactRequestStatus::None
507 }
508 }
509
510 pub fn request_contact(
511 &mut self,
512 responder_id: u64,
513 cx: &mut Context<Self>,
514 ) -> Task<Result<()>> {
515 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
516 }
517
518 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
519 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
520 }
521
522 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
523 self.incoming_contact_requests
524 .iter()
525 .any(|user| user.id == user_id)
526 }
527
528 pub fn respond_to_contact_request(
529 &mut self,
530 requester_id: u64,
531 accept: bool,
532 cx: &mut Context<Self>,
533 ) -> Task<Result<()>> {
534 self.perform_contact_request(
535 requester_id,
536 proto::RespondToContactRequest {
537 requester_id,
538 response: if accept {
539 proto::ContactRequestResponse::Accept
540 } else {
541 proto::ContactRequestResponse::Decline
542 } as i32,
543 },
544 cx,
545 )
546 }
547
548 pub fn dismiss_contact_request(
549 &self,
550 requester_id: u64,
551 cx: &Context<Self>,
552 ) -> Task<Result<()>> {
553 let client = self.client.upgrade();
554 cx.spawn(async move |_, _| {
555 client
556 .context("can't upgrade client reference")?
557 .request(proto::RespondToContactRequest {
558 requester_id,
559 response: proto::ContactRequestResponse::Dismiss as i32,
560 })
561 .await?;
562 Ok(())
563 })
564 }
565
566 fn perform_contact_request<T: RequestMessage>(
567 &mut self,
568 user_id: u64,
569 request: T,
570 cx: &mut Context<Self>,
571 ) -> Task<Result<()>> {
572 let client = self.client.upgrade();
573 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
574 cx.notify();
575
576 cx.spawn(async move |this, cx| {
577 let response = client
578 .context("can't upgrade client reference")?
579 .request(request)
580 .await;
581 this.update(cx, |this, cx| {
582 if let Entry::Occupied(mut request_count) =
583 this.pending_contact_requests.entry(user_id)
584 {
585 *request_count.get_mut() -= 1;
586 if *request_count.get() == 0 {
587 request_count.remove();
588 }
589 }
590 cx.notify();
591 })?;
592 response?;
593 Ok(())
594 })
595 }
596
597 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
598 let (tx, mut rx) = postage::barrier::channel();
599 self.update_contacts_tx
600 .unbounded_send(UpdateContacts::Clear(tx))
601 .unwrap();
602 async move {
603 rx.next().await;
604 }
605 }
606
607 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
608 let (tx, mut rx) = postage::barrier::channel();
609 self.update_contacts_tx
610 .unbounded_send(UpdateContacts::Wait(tx))
611 .unwrap();
612 async move {
613 rx.next().await;
614 }
615 }
616
617 pub fn get_users(
618 &self,
619 user_ids: Vec<u64>,
620 cx: &Context<Self>,
621 ) -> Task<Result<Vec<Arc<User>>>> {
622 let mut user_ids_to_fetch = user_ids.clone();
623 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
624
625 cx.spawn(async move |this, cx| {
626 if !user_ids_to_fetch.is_empty() {
627 this.update(cx, |this, cx| {
628 this.load_users(
629 proto::GetUsers {
630 user_ids: user_ids_to_fetch,
631 },
632 cx,
633 )
634 })?
635 .await?;
636 }
637
638 this.read_with(cx, |this, _| {
639 user_ids
640 .iter()
641 .map(|user_id| {
642 this.users
643 .get(user_id)
644 .cloned()
645 .with_context(|| format!("user {user_id} not found"))
646 })
647 .collect()
648 })?
649 })
650 }
651
652 pub fn fuzzy_search_users(
653 &self,
654 query: String,
655 cx: &Context<Self>,
656 ) -> Task<Result<Vec<Arc<User>>>> {
657 self.load_users(proto::FuzzySearchUsers { query }, cx)
658 }
659
660 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
661 self.users.get(&user_id).cloned()
662 }
663
664 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
665 if let Some(user) = self.users.get(&user_id).cloned() {
666 return Some(user);
667 }
668
669 self.get_user(user_id, cx).detach_and_log_err(cx);
670 None
671 }
672
673 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
674 if let Some(user) = self.users.get(&user_id).cloned() {
675 return Task::ready(Ok(user));
676 }
677
678 let load_users = self.get_users(vec![user_id], cx);
679 cx.spawn(async move |this, cx| {
680 load_users.await?;
681 this.read_with(cx, |this, _| {
682 this.users
683 .get(&user_id)
684 .cloned()
685 .context("server responded with no users")
686 })?
687 })
688 }
689
690 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
691 self.by_github_login
692 .get(github_login)
693 .and_then(|id| self.users.get(id).cloned())
694 }
695
696 pub fn current_user(&self) -> Option<Arc<User>> {
697 self.current_user.borrow().clone()
698 }
699
700 pub fn current_organization(&self) -> Option<Arc<Organization>> {
701 self.current_organization.clone()
702 }
703
704 pub fn set_current_organization(
705 &mut self,
706 organization: Arc<Organization>,
707 cx: &mut Context<Self>,
708 ) {
709 let is_same_organization = self
710 .current_organization
711 .as_ref()
712 .is_some_and(|current| current.id == organization.id);
713
714 if !is_same_organization {
715 let organization_id = organization.id.0.to_string();
716 self.current_organization.replace(organization);
717 cx.emit(Event::OrganizationChanged);
718 cx.notify();
719
720 let kvp = KeyValueStore::global(cx);
721 db::write_and_log(cx, move || async move {
722 kvp.write_kvp(CURRENT_ORGANIZATION_ID_KEY.into(), organization_id)
723 .await
724 });
725 }
726 }
727
728 pub fn organizations(&self) -> &Vec<Arc<Organization>> {
729 &self.organizations
730 }
731
732 pub fn plan_for_organization(&self, organization_id: &OrganizationId) -> Option<Plan> {
733 self.plans_by_organization.get(organization_id).copied()
734 }
735
736 pub fn current_organization_configuration(&self) -> Option<&OrganizationConfiguration> {
737 let current_organization = self.current_organization.as_ref()?;
738
739 self.configuration_by_organization
740 .get(¤t_organization.id)
741 }
742
743 pub fn plan(&self) -> Option<Plan> {
744 #[cfg(debug_assertions)]
745 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
746 use cloud_api_client::Plan;
747
748 return match plan.as_str() {
749 "free" => Some(Plan::ZedFree),
750 "trial" => Some(Plan::ZedProTrial),
751 "pro" => Some(Plan::ZedPro),
752 _ => {
753 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
754 }
755 };
756 }
757
758 if let Some(organization) = &self.current_organization
759 && let Some(plan) = self.plan_for_organization(&organization.id)
760 {
761 return Some(plan);
762 }
763
764 self.plan_info.as_ref().map(|info| info.plan())
765 }
766
767 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
768 self.plan_info
769 .as_ref()
770 .and_then(|plan| plan.subscription_period)
771 .map(|subscription_period| {
772 (
773 subscription_period.started_at.0,
774 subscription_period.ended_at.0,
775 )
776 })
777 }
778
779 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
780 self.plan_info
781 .as_ref()
782 .and_then(|plan| plan.trial_started_at)
783 .map(|trial_started_at| trial_started_at.0)
784 }
785
786 /// Returns whether the user's account is too new to use the service.
787 pub fn account_too_young(&self) -> bool {
788 self.plan_info
789 .as_ref()
790 .map(|plan| plan.is_account_too_young)
791 .unwrap_or_default()
792 }
793
794 /// Returns whether the current user has overdue invoices and usage should be blocked.
795 pub fn has_overdue_invoices(&self) -> bool {
796 self.plan_info
797 .as_ref()
798 .map(|plan| plan.has_overdue_invoices)
799 .unwrap_or_default()
800 }
801
802 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
803 self.edit_prediction_usage
804 }
805
806 pub fn update_edit_prediction_usage(
807 &mut self,
808 usage: EditPredictionUsage,
809 cx: &mut Context<Self>,
810 ) {
811 self.edit_prediction_usage = Some(usage);
812 cx.notify();
813 }
814
815 pub fn clear_organizations(&mut self) {
816 self.organizations.clear();
817 self.current_organization = None;
818 }
819
820 pub fn clear_plan_and_usage(&mut self) {
821 self.plan_info = None;
822 self.edit_prediction_usage = None;
823 }
824
825 fn update_authenticated_user(
826 &mut self,
827 response: GetAuthenticatedUserResponse,
828 cx: &mut Context<Self>,
829 ) {
830 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
831 cx.update_flags(staff, response.feature_flags);
832 if let Some(client) = self.client.upgrade() {
833 client
834 .telemetry
835 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
836 }
837
838 self.organizations = response.organizations.into_iter().map(Arc::new).collect();
839 let persisted_org_id = KeyValueStore::global(cx)
840 .read_kvp(CURRENT_ORGANIZATION_ID_KEY)
841 .log_err()
842 .flatten()
843 .map(|id| OrganizationId(Arc::from(id)));
844
845 self.current_organization = persisted_org_id
846 .and_then(|persisted_id| {
847 self.organizations
848 .iter()
849 .find(|org| org.id == persisted_id)
850 .cloned()
851 })
852 .or_else(|| {
853 response
854 .default_organization_id
855 .and_then(|default_organization_id| {
856 self.organizations
857 .iter()
858 .find(|organization| organization.id == default_organization_id)
859 .cloned()
860 })
861 })
862 .or_else(|| self.organizations.first().cloned());
863 self.plans_by_organization = response
864 .plans_by_organization
865 .into_iter()
866 .map(|(organization_id, plan)| {
867 let plan = match plan {
868 KnownOrUnknown::Known(plan) => plan,
869 KnownOrUnknown::Unknown(_) => {
870 // If we get a plan that we don't recognize, fall back to the Free plan.
871 Plan::ZedFree
872 }
873 };
874
875 (organization_id, plan)
876 })
877 .collect();
878 self.configuration_by_organization =
879 response.configuration_by_organization.into_iter().collect();
880
881 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
882 limit: response.plan.usage.edit_predictions.limit,
883 amount: response.plan.usage.edit_predictions.used as i32,
884 }));
885 self.plan_info = Some(response.plan);
886 cx.emit(Event::PrivateUserInfoUpdated);
887 }
888
889 fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
890 cx.spawn(async move |cx| {
891 match message {
892 MessageToClient::UserUpdated => {
893 let cloud_client = cx
894 .update(|cx| {
895 this.read_with(cx, |this, _cx| {
896 this.client.upgrade().map(|client| client.cloud_client())
897 })
898 })?
899 .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
900
901 let response = cloud_client.get_authenticated_user().await?;
902 cx.update(|cx| {
903 this.update(cx, |this, cx| {
904 this.update_authenticated_user(response, cx);
905 })
906 })?;
907 }
908 }
909
910 anyhow::Ok(())
911 })
912 .detach_and_log_err(cx);
913 }
914
915 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
916 self.current_user.clone()
917 }
918
919 fn load_users(
920 &self,
921 request: impl RequestMessage<Response = UsersResponse>,
922 cx: &Context<Self>,
923 ) -> Task<Result<Vec<Arc<User>>>> {
924 let client = self.client.clone();
925 cx.spawn(async move |this, cx| {
926 if let Some(rpc) = client.upgrade() {
927 let response = rpc.request(request).await.context("error loading users")?;
928 let users = response.users;
929
930 this.update(cx, |this, _| this.insert(users))
931 } else {
932 Ok(Vec::new())
933 }
934 })
935 }
936
937 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
938 let mut ret = Vec::with_capacity(users.len());
939 for user in users {
940 let user = User::new(user);
941 if let Some(old) = self.users.insert(user.id, user.clone())
942 && old.github_login != user.github_login
943 {
944 self.by_github_login.remove(&old.github_login);
945 }
946 self.by_github_login
947 .insert(user.github_login.clone(), user.id);
948 ret.push(user)
949 }
950 ret
951 }
952
953 pub fn set_participant_indices(
954 &mut self,
955 participant_indices: HashMap<u64, ParticipantIndex>,
956 cx: &mut Context<Self>,
957 ) {
958 if participant_indices != self.participant_indices {
959 self.participant_indices = participant_indices;
960 cx.emit(Event::ParticipantIndicesChanged);
961 }
962 }
963
964 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
965 &self.participant_indices
966 }
967
968 pub fn participant_names(
969 &self,
970 user_ids: impl Iterator<Item = u64>,
971 cx: &App,
972 ) -> HashMap<u64, SharedString> {
973 let mut ret = HashMap::default();
974 let mut missing_user_ids = Vec::new();
975 for id in user_ids {
976 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
977 ret.insert(id, github_login);
978 } else {
979 missing_user_ids.push(id)
980 }
981 }
982 if !missing_user_ids.is_empty() {
983 let this = self.weak_self.clone();
984 cx.spawn(async move |cx| {
985 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
986 .await
987 })
988 .detach_and_log_err(cx);
989 }
990 ret
991 }
992}
993
994impl User {
995 fn new(message: proto::User) -> Arc<Self> {
996 Arc::new(User {
997 id: message.id,
998 github_login: message.github_login.into(),
999 avatar_uri: message.avatar_url.into(),
1000 name: message.name,
1001 })
1002 }
1003}
1004
1005impl Contact {
1006 async fn from_proto(
1007 contact: proto::Contact,
1008 user_store: &Entity<UserStore>,
1009 cx: &mut AsyncApp,
1010 ) -> Result<Self> {
1011 let user = user_store
1012 .update(cx, |user_store, cx| {
1013 user_store.get_user(contact.user_id, cx)
1014 })
1015 .await?;
1016 Ok(Self {
1017 user,
1018 online: contact.online,
1019 busy: contact.busy,
1020 })
1021 }
1022}
1023
1024impl Collaborator {
1025 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
1026 Ok(Self {
1027 peer_id: message.peer_id.context("invalid peer id")?,
1028 replica_id: ReplicaId::new(message.replica_id as u16),
1029 user_id: message.user_id as UserId,
1030 is_host: message.is_host,
1031 committer_name: message.committer_name,
1032 committer_email: message.committer_email,
1033 })
1034 }
1035}
1036
1037impl RequestUsage {
1038 pub fn over_limit(&self) -> bool {
1039 match self.limit {
1040 UsageLimit::Limited(limit) => self.amount >= limit,
1041 UsageLimit::Unlimited => false,
1042 }
1043 }
1044
1045 fn from_headers(
1046 limit_name: &str,
1047 amount_name: &str,
1048 headers: &HeaderMap<HeaderValue>,
1049 ) -> Result<Self> {
1050 let limit = headers
1051 .get(limit_name)
1052 .with_context(|| format!("missing {limit_name:?} header"))?;
1053 let limit = UsageLimit::from_str(limit.to_str()?)?;
1054
1055 let amount = headers
1056 .get(amount_name)
1057 .with_context(|| format!("missing {amount_name:?} header"))?;
1058 let amount = amount.to_str()?.parse::<i32>()?;
1059
1060 Ok(Self { limit, amount })
1061 }
1062}
1063
1064impl EditPredictionUsage {
1065 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1066 Ok(Self(RequestUsage::from_headers(
1067 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1068 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1069 headers,
1070 )?))
1071 }
1072}