1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::Context as _;
5use chrono::{DateTime, Utc};
6use cloud_api_client::{AuthenticatedUser, CloudApiClient, GetAuthenticatedUserResponse, PlanInfo};
7use cloud_llm_client::Plan;
8use gpui::{Context, Entity, Subscription, Task};
9use util::{ResultExt as _, maybe};
10
11use crate::UserStore;
12use crate::user::Event as RpcUserStoreEvent;
13
14pub struct CloudUserStore {
15 cloud_client: Arc<CloudApiClient>,
16 authenticated_user: Option<Arc<AuthenticatedUser>>,
17 plan_info: Option<Arc<PlanInfo>>,
18 _maintain_authenticated_user_task: Task<()>,
19 _rpc_plan_updated_subscription: Subscription,
20}
21
22impl CloudUserStore {
23 pub fn new(
24 cloud_client: Arc<CloudApiClient>,
25 rpc_user_store: Entity<UserStore>,
26 cx: &mut Context<Self>,
27 ) -> Self {
28 let rpc_plan_updated_subscription =
29 cx.subscribe(&rpc_user_store, Self::handle_rpc_user_store_event);
30
31 Self {
32 cloud_client: cloud_client.clone(),
33 authenticated_user: None,
34 plan_info: None,
35 _maintain_authenticated_user_task: cx.spawn(async move |this, cx| {
36 maybe!(async move {
37 loop {
38 let Some(this) = this.upgrade() else {
39 return anyhow::Ok(());
40 };
41
42 if cloud_client.has_credentials() {
43 let already_fetched_authenticated_user = this
44 .read_with(cx, |this, _cx| this.authenticated_user().is_some())
45 .unwrap_or(false);
46
47 if already_fetched_authenticated_user {
48 // We already fetched the authenticated user; nothing to do.
49 } else {
50 let authenticated_user_result = cloud_client
51 .get_authenticated_user()
52 .await
53 .context("failed to fetch authenticated user");
54 if let Some(response) = authenticated_user_result.log_err() {
55 this.update(cx, |this, _cx| {
56 this.update_authenticated_user(response);
57 })
58 .ok();
59 }
60 }
61 } else {
62 this.update(cx, |this, _cx| {
63 this.authenticated_user.take();
64 this.plan_info.take();
65 })
66 .ok();
67 }
68
69 cx.background_executor()
70 .timer(Duration::from_millis(100))
71 .await;
72 }
73 })
74 .await
75 .log_err();
76 }),
77 _rpc_plan_updated_subscription: rpc_plan_updated_subscription,
78 }
79 }
80
81 pub fn is_authenticated(&self) -> bool {
82 self.authenticated_user.is_some()
83 }
84
85 pub fn authenticated_user(&self) -> Option<Arc<AuthenticatedUser>> {
86 self.authenticated_user.clone()
87 }
88
89 pub fn plan(&self) -> Option<Plan> {
90 self.plan_info.as_ref().map(|plan| plan.plan)
91 }
92
93 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
94 self.plan_info
95 .as_ref()
96 .and_then(|plan| plan.subscription_period)
97 .map(|subscription_period| {
98 (
99 subscription_period.started_at.0,
100 subscription_period.ended_at.0,
101 )
102 })
103 }
104
105 fn update_authenticated_user(&mut self, response: GetAuthenticatedUserResponse) {
106 self.authenticated_user = Some(Arc::new(response.user));
107 self.plan_info = Some(Arc::new(response.plan));
108 }
109
110 fn handle_rpc_user_store_event(
111 &mut self,
112 _: Entity<UserStore>,
113 event: &RpcUserStoreEvent,
114 cx: &mut Context<Self>,
115 ) {
116 match event {
117 RpcUserStoreEvent::PlanUpdated => {
118 cx.spawn(async move |this, cx| {
119 let cloud_client =
120 cx.update(|cx| this.read_with(cx, |this, _cx| this.cloud_client.clone()))??;
121
122 let response = cloud_client
123 .get_authenticated_user()
124 .await
125 .context("failed to fetch authenticated user")?;
126
127 cx.update(|cx| {
128 this.update(cx, |this, _cx| {
129 this.update_authenticated_user(response);
130 })
131 })??;
132
133 anyhow::Ok(())
134 })
135 .detach_and_log_err(cx);
136 }
137 _ => {}
138 }
139 }
140}