user_backfiller.rs

  1use std::sync::Arc;
  2
  3use anyhow::{Context as _, Result};
  4use chrono::{DateTime, Utc};
  5use util::ResultExt;
  6
  7use crate::db::Database;
  8use crate::executor::Executor;
  9use crate::{AppState, Config};
 10
 11pub fn spawn_user_backfiller(app_state: Arc<AppState>) {
 12    let Some(user_backfiller_github_access_token) =
 13        app_state.config.user_backfiller_github_access_token.clone()
 14    else {
 15        log::info!("no USER_BACKFILLER_GITHUB_ACCESS_TOKEN set; not spawning user backfiller");
 16        return;
 17    };
 18
 19    let executor = app_state.executor.clone();
 20    executor.spawn_detached({
 21        let executor = executor.clone();
 22        async move {
 23            let user_backfiller = UserBackfiller::new(
 24                app_state.config.clone(),
 25                user_backfiller_github_access_token,
 26                app_state.db.clone(),
 27                executor,
 28            );
 29
 30            log::info!("backfilling users");
 31
 32            user_backfiller
 33                .backfill_github_user_created_at()
 34                .await
 35                .log_err();
 36        }
 37    });
 38}
 39
 40const GITHUB_REQUESTS_PER_HOUR_LIMIT: usize = 5_000;
 41const SLEEP_DURATION_BETWEEN_USERS: std::time::Duration = std::time::Duration::from_millis(
 42    (GITHUB_REQUESTS_PER_HOUR_LIMIT as f64 / 60. / 60. * 1000.) as u64,
 43);
 44
 45struct UserBackfiller {
 46    config: Config,
 47    github_access_token: Arc<str>,
 48    db: Arc<Database>,
 49    http_client: reqwest::Client,
 50    executor: Executor,
 51}
 52
 53impl UserBackfiller {
 54    fn new(
 55        config: Config,
 56        github_access_token: Arc<str>,
 57        db: Arc<Database>,
 58        executor: Executor,
 59    ) -> Self {
 60        Self {
 61            config,
 62            github_access_token,
 63            db,
 64            http_client: reqwest::Client::new(),
 65            executor,
 66        }
 67    }
 68
 69    async fn backfill_github_user_created_at(&self) -> Result<()> {
 70        let initial_channel_id = self.config.auto_join_channel_id;
 71
 72        let users_missing_github_user_created_at =
 73            self.db.get_users_missing_github_user_created_at().await?;
 74
 75        for user in users_missing_github_user_created_at {
 76            match self
 77                .fetch_github_user(&format!(
 78                    "https://api.github.com/user/{}",
 79                    user.github_user_id
 80                ))
 81                .await
 82            {
 83                Ok(github_user) => {
 84                    self.db
 85                        .update_or_create_user_by_github_account(
 86                            &user.github_login,
 87                            github_user.id,
 88                            user.email_address.as_deref(),
 89                            user.name.as_deref(),
 90                            github_user.created_at,
 91                            initial_channel_id,
 92                        )
 93                        .await?;
 94
 95                    log::info!("backfilled user: {}", user.github_login);
 96                }
 97                Err(err) => {
 98                    log::error!("failed to fetch GitHub user {}: {err}", user.github_login);
 99                }
100            }
101
102            self.executor.sleep(SLEEP_DURATION_BETWEEN_USERS).await;
103        }
104
105        Ok(())
106    }
107
108    async fn fetch_github_user(&self, url: &str) -> Result<GithubUser> {
109        let response = self
110            .http_client
111            .get(url)
112            .header(
113                "authorization",
114                format!("Bearer {}", self.github_access_token),
115            )
116            .header("user-agent", "zed")
117            .send()
118            .await
119            .with_context(|| format!("failed to fetch '{url}'"))?;
120
121        let rate_limit_remaining = response
122            .headers()
123            .get("x-ratelimit-remaining")
124            .and_then(|value| value.to_str().ok())
125            .and_then(|value| value.parse::<i32>().ok());
126        let rate_limit_reset = response
127            .headers()
128            .get("x-ratelimit-reset")
129            .and_then(|value| value.to_str().ok())
130            .and_then(|value| value.parse::<i64>().ok())
131            .and_then(|value| DateTime::from_timestamp(value, 0));
132
133        if rate_limit_remaining == Some(0)
134            && let Some(reset_at) = rate_limit_reset
135        {
136            let now = Utc::now();
137            if reset_at > now {
138                let sleep_duration = reset_at - now;
139                log::info!(
140                    "rate limit reached. Sleeping for {} seconds",
141                    sleep_duration.num_seconds()
142                );
143                self.executor.sleep(sleep_duration.to_std().unwrap()).await;
144            }
145        }
146
147        response
148            .error_for_status()
149            .context("fetching GitHub user")?
150            .json()
151            .await
152            .with_context(|| format!("failed to deserialize GitHub user from '{url}'"))
153    }
154}
155
156#[derive(serde::Deserialize)]
157struct GithubUser {
158    id: i32,
159    created_at: DateTime<Utc>,
160    #[expect(
161        unused,
162        reason = "This field was found to be unused with serde library bump; it's left as is due to insufficient context on PO's side, but it *may* be fine to remove"
163    )]
164    name: Option<String>,
165}