user_store.rs

  1use std::sync::Arc;
  2use std::time::Duration;
  3
  4use anyhow::Context as _;
  5use chrono::{DateTime, Utc};
  6use cloud_api_client::{AuthenticatedUser, CloudApiClient, GetAuthenticatedUserResponse, PlanInfo};
  7use cloud_llm_client::Plan;
  8use gpui::{Context, Entity, Subscription, Task};
  9use util::{ResultExt as _, maybe};
 10
 11use crate::UserStore;
 12use crate::user::Event as RpcUserStoreEvent;
 13
 14pub struct CloudUserStore {
 15    cloud_client: Arc<CloudApiClient>,
 16    authenticated_user: Option<Arc<AuthenticatedUser>>,
 17    plan_info: Option<Arc<PlanInfo>>,
 18    _maintain_authenticated_user_task: Task<()>,
 19    _rpc_plan_updated_subscription: Subscription,
 20}
 21
 22impl CloudUserStore {
 23    pub fn new(
 24        cloud_client: Arc<CloudApiClient>,
 25        rpc_user_store: Entity<UserStore>,
 26        cx: &mut Context<Self>,
 27    ) -> Self {
 28        let rpc_plan_updated_subscription =
 29            cx.subscribe(&rpc_user_store, Self::handle_rpc_user_store_event);
 30
 31        Self {
 32            cloud_client: cloud_client.clone(),
 33            authenticated_user: None,
 34            plan_info: None,
 35            _maintain_authenticated_user_task: cx.spawn(async move |this, cx| {
 36                maybe!(async move {
 37                    loop {
 38                        let Some(this) = this.upgrade() else {
 39                            return anyhow::Ok(());
 40                        };
 41
 42                        if cloud_client.has_credentials() {
 43                            let already_fetched_authenticated_user = this
 44                                .read_with(cx, |this, _cx| this.authenticated_user().is_some())
 45                                .unwrap_or(false);
 46
 47                            if already_fetched_authenticated_user {
 48                                // We already fetched the authenticated user; nothing to do.
 49                            } else {
 50                                let authenticated_user_result = cloud_client
 51                                    .get_authenticated_user()
 52                                    .await
 53                                    .context("failed to fetch authenticated user");
 54                                if let Some(response) = authenticated_user_result.log_err() {
 55                                    this.update(cx, |this, _cx| {
 56                                        this.update_authenticated_user(response);
 57                                    })
 58                                    .ok();
 59                                }
 60                            }
 61                        } else {
 62                            this.update(cx, |this, _cx| {
 63                                this.authenticated_user.take();
 64                                this.plan_info.take();
 65                            })
 66                            .ok();
 67                        }
 68
 69                        cx.background_executor()
 70                            .timer(Duration::from_millis(100))
 71                            .await;
 72                    }
 73                })
 74                .await
 75                .log_err();
 76            }),
 77            _rpc_plan_updated_subscription: rpc_plan_updated_subscription,
 78        }
 79    }
 80
 81    pub fn is_authenticated(&self) -> bool {
 82        self.authenticated_user.is_some()
 83    }
 84
 85    pub fn authenticated_user(&self) -> Option<Arc<AuthenticatedUser>> {
 86        self.authenticated_user.clone()
 87    }
 88
 89    pub fn plan(&self) -> Option<Plan> {
 90        self.plan_info.as_ref().map(|plan| plan.plan)
 91    }
 92
 93    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
 94        self.plan_info
 95            .as_ref()
 96            .and_then(|plan| plan.subscription_period)
 97            .map(|subscription_period| {
 98                (
 99                    subscription_period.started_at.0,
100                    subscription_period.ended_at.0,
101                )
102            })
103    }
104
105    fn update_authenticated_user(&mut self, response: GetAuthenticatedUserResponse) {
106        self.authenticated_user = Some(Arc::new(response.user));
107        self.plan_info = Some(Arc::new(response.plan));
108    }
109
110    fn handle_rpc_user_store_event(
111        &mut self,
112        _: Entity<UserStore>,
113        event: &RpcUserStoreEvent,
114        cx: &mut Context<Self>,
115    ) {
116        match event {
117            RpcUserStoreEvent::PlanUpdated => {
118                cx.spawn(async move |this, cx| {
119                    let cloud_client =
120                        cx.update(|cx| this.read_with(cx, |this, _cx| this.cloud_client.clone()))??;
121
122                    let response = cloud_client
123                        .get_authenticated_user()
124                        .await
125                        .context("failed to fetch authenticated user")?;
126
127                    cx.update(|cx| {
128                        this.update(cx, |this, _cx| {
129                            this.update_authenticated_user(response);
130                        })
131                    })??;
132
133                    anyhow::Ok(())
134                })
135                .detach_and_log_err(cx);
136            }
137            _ => {}
138        }
139    }
140}