billing.rs

  1use anyhow::{Context as _, bail};
  2use axum::{Extension, Json, Router, extract, routing::post};
  3use chrono::{DateTime, Utc};
  4use collections::{HashMap, HashSet};
  5use reqwest::StatusCode;
  6use sea_orm::ActiveValue;
  7use serde::{Deserialize, Serialize};
  8use std::{sync::Arc, time::Duration};
  9use stripe::{CancellationDetailsReason, EventObject, EventType, ListEvents, SubscriptionStatus};
 10use util::{ResultExt, maybe};
 11use zed_llm_client::LanguageModelProvider;
 12
 13use crate::db::billing_subscription::{
 14    StripeCancellationReason, StripeSubscriptionStatus, SubscriptionKind,
 15};
 16use crate::llm::db::subscription_usage_meter::{self, CompletionMode};
 17use crate::rpc::{ResultExt as _, Server};
 18use crate::stripe_client::{
 19    StripeCancellationDetailsReason, StripeClient, StripeCustomerId, StripeSubscription,
 20    StripeSubscriptionId,
 21};
 22use crate::{AppState, Error, Result};
 23use crate::{db::UserId, llm::db::LlmDatabase};
 24use crate::{
 25    db::{
 26        CreateBillingCustomerParams, CreateBillingSubscriptionParams,
 27        CreateProcessedStripeEventParams, UpdateBillingCustomerParams,
 28        UpdateBillingSubscriptionParams, billing_customer,
 29    },
 30    stripe_billing::StripeBilling,
 31};
 32
 33pub fn router() -> Router {
 34    Router::new().route(
 35        "/billing/subscriptions/sync",
 36        post(sync_billing_subscription),
 37    )
 38}
 39
 40#[derive(Debug, Deserialize)]
 41struct SyncBillingSubscriptionBody {
 42    github_user_id: i32,
 43}
 44
 45#[derive(Debug, Serialize)]
 46struct SyncBillingSubscriptionResponse {
 47    stripe_customer_id: String,
 48}
 49
 50async fn sync_billing_subscription(
 51    Extension(app): Extension<Arc<AppState>>,
 52    extract::Json(body): extract::Json<SyncBillingSubscriptionBody>,
 53) -> Result<Json<SyncBillingSubscriptionResponse>> {
 54    let Some(stripe_client) = app.stripe_client.clone() else {
 55        log::error!("failed to retrieve Stripe client");
 56        Err(Error::http(
 57            StatusCode::NOT_IMPLEMENTED,
 58            "not supported".into(),
 59        ))?
 60    };
 61
 62    let user = app
 63        .db
 64        .get_user_by_github_user_id(body.github_user_id)
 65        .await?
 66        .context("user not found")?;
 67
 68    let billing_customer = app
 69        .db
 70        .get_billing_customer_by_user_id(user.id)
 71        .await?
 72        .context("billing customer not found")?;
 73    let stripe_customer_id = StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
 74
 75    let subscriptions = stripe_client
 76        .list_subscriptions_for_customer(&stripe_customer_id)
 77        .await?;
 78
 79    for subscription in subscriptions {
 80        let subscription_id = subscription.id.clone();
 81
 82        sync_subscription(&app, &stripe_client, subscription)
 83            .await
 84            .with_context(|| {
 85                format!(
 86                    "failed to sync subscription {subscription_id} for user {}",
 87                    user.id,
 88                )
 89            })?;
 90    }
 91
 92    Ok(Json(SyncBillingSubscriptionResponse {
 93        stripe_customer_id: billing_customer.stripe_customer_id.clone(),
 94    }))
 95}
 96
 97/// The amount of time we wait in between each poll of Stripe events.
 98///
 99/// This value should strike a balance between:
100///   1. Being short enough that we update quickly when something in Stripe changes
101///   2. Being long enough that we don't eat into our rate limits.
102///
103/// As a point of reference, the Sequin folks say they have this at **500ms**:
104///
105/// > We poll the Stripe /events endpoint every 500ms per account
106/// >
107/// > — https://blog.sequinstream.com/events-not-webhooks/
108const POLL_EVENTS_INTERVAL: Duration = Duration::from_secs(5);
109
110/// The maximum number of events to return per page.
111///
112/// We set this to 100 (the max) so we have to make fewer requests to Stripe.
113///
114/// > Limit can range between 1 and 100, and the default is 10.
115const EVENTS_LIMIT_PER_PAGE: u64 = 100;
116
117/// The number of pages consisting entirely of already-processed events that we
118/// will see before we stop retrieving events.
119///
120/// This is used to prevent over-fetching the Stripe events API for events we've
121/// already seen and processed.
122const NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP: usize = 4;
123
124/// Polls the Stripe events API periodically to reconcile the records in our
125/// database with the data in Stripe.
126pub fn poll_stripe_events_periodically(app: Arc<AppState>, rpc_server: Arc<Server>) {
127    let Some(real_stripe_client) = app.real_stripe_client.clone() else {
128        log::warn!("failed to retrieve Stripe client");
129        return;
130    };
131    let Some(stripe_client) = app.stripe_client.clone() else {
132        log::warn!("failed to retrieve Stripe client");
133        return;
134    };
135
136    let executor = app.executor.clone();
137    executor.spawn_detached({
138        let executor = executor.clone();
139        async move {
140            loop {
141                poll_stripe_events(&app, &rpc_server, &stripe_client, &real_stripe_client)
142                    .await
143                    .log_err();
144
145                executor.sleep(POLL_EVENTS_INTERVAL).await;
146            }
147        }
148    });
149}
150
151async fn poll_stripe_events(
152    app: &Arc<AppState>,
153    rpc_server: &Arc<Server>,
154    stripe_client: &Arc<dyn StripeClient>,
155    real_stripe_client: &stripe::Client,
156) -> anyhow::Result<()> {
157    fn event_type_to_string(event_type: EventType) -> String {
158        // Calling `to_string` on `stripe::EventType` members gives us a quoted string,
159        // so we need to unquote it.
160        event_type.to_string().trim_matches('"').to_string()
161    }
162
163    let event_types = [
164        EventType::CustomerCreated,
165        EventType::CustomerUpdated,
166        EventType::CustomerSubscriptionCreated,
167        EventType::CustomerSubscriptionUpdated,
168        EventType::CustomerSubscriptionPaused,
169        EventType::CustomerSubscriptionResumed,
170        EventType::CustomerSubscriptionDeleted,
171    ]
172    .into_iter()
173    .map(event_type_to_string)
174    .collect::<Vec<_>>();
175
176    let mut pages_of_already_processed_events = 0;
177    let mut unprocessed_events = Vec::new();
178
179    log::info!(
180        "Stripe events: starting retrieval for {}",
181        event_types.join(", ")
182    );
183    let mut params = ListEvents::new();
184    params.types = Some(event_types.clone());
185    params.limit = Some(EVENTS_LIMIT_PER_PAGE);
186
187    let mut event_pages = stripe::Event::list(&real_stripe_client, &params)
188        .await?
189        .paginate(params);
190
191    loop {
192        let processed_event_ids = {
193            let event_ids = event_pages
194                .page
195                .data
196                .iter()
197                .map(|event| event.id.as_str())
198                .collect::<Vec<_>>();
199            app.db
200                .get_processed_stripe_events_by_event_ids(&event_ids)
201                .await?
202                .into_iter()
203                .map(|event| event.stripe_event_id)
204                .collect::<Vec<_>>()
205        };
206
207        let mut processed_events_in_page = 0;
208        let events_in_page = event_pages.page.data.len();
209        for event in &event_pages.page.data {
210            if processed_event_ids.contains(&event.id.to_string()) {
211                processed_events_in_page += 1;
212                log::debug!("Stripe events: already processed '{}', skipping", event.id);
213            } else {
214                unprocessed_events.push(event.clone());
215            }
216        }
217
218        if processed_events_in_page == events_in_page {
219            pages_of_already_processed_events += 1;
220        }
221
222        if event_pages.page.has_more {
223            if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP
224            {
225                log::info!(
226                    "Stripe events: stopping, saw {pages_of_already_processed_events} pages of already-processed events"
227                );
228                break;
229            } else {
230                log::info!("Stripe events: retrieving next page");
231                event_pages = event_pages.next(&real_stripe_client).await?;
232            }
233        } else {
234            break;
235        }
236    }
237
238    log::info!("Stripe events: unprocessed {}", unprocessed_events.len());
239
240    // Sort all of the unprocessed events in ascending order, so we can handle them in the order they occurred.
241    unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id)));
242
243    for event in unprocessed_events {
244        let event_id = event.id.clone();
245        let processed_event_params = CreateProcessedStripeEventParams {
246            stripe_event_id: event.id.to_string(),
247            stripe_event_type: event_type_to_string(event.type_),
248            stripe_event_created_timestamp: event.created,
249        };
250
251        // If the event has happened too far in the past, we don't want to
252        // process it and risk overwriting other more-recent updates.
253        //
254        // 1 day was chosen arbitrarily. This could be made longer or shorter.
255        let one_day = Duration::from_secs(24 * 60 * 60);
256        let a_day_ago = Utc::now() - one_day;
257        if a_day_ago.timestamp() > event.created {
258            log::info!(
259                "Stripe events: event '{}' is more than {one_day:?} old, marking as processed",
260                event_id
261            );
262            app.db
263                .create_processed_stripe_event(&processed_event_params)
264                .await?;
265
266            continue;
267        }
268
269        let process_result = match event.type_ {
270            EventType::CustomerCreated | EventType::CustomerUpdated => {
271                handle_customer_event(app, real_stripe_client, event).await
272            }
273            EventType::CustomerSubscriptionCreated
274            | EventType::CustomerSubscriptionUpdated
275            | EventType::CustomerSubscriptionPaused
276            | EventType::CustomerSubscriptionResumed
277            | EventType::CustomerSubscriptionDeleted => {
278                handle_customer_subscription_event(app, rpc_server, stripe_client, event).await
279            }
280            _ => Ok(()),
281        };
282
283        if let Some(()) = process_result
284            .with_context(|| format!("failed to process event {event_id} successfully"))
285            .log_err()
286        {
287            app.db
288                .create_processed_stripe_event(&processed_event_params)
289                .await?;
290        }
291    }
292
293    Ok(())
294}
295
296async fn handle_customer_event(
297    app: &Arc<AppState>,
298    _stripe_client: &stripe::Client,
299    event: stripe::Event,
300) -> anyhow::Result<()> {
301    let EventObject::Customer(customer) = event.data.object else {
302        bail!("unexpected event payload for {}", event.id);
303    };
304
305    log::info!("handling Stripe {} event: {}", event.type_, event.id);
306
307    let Some(email) = customer.email else {
308        log::info!("Stripe customer has no email: skipping");
309        return Ok(());
310    };
311
312    let Some(user) = app.db.get_user_by_email(&email).await? else {
313        log::info!("no user found for email: skipping");
314        return Ok(());
315    };
316
317    if let Some(existing_customer) = app
318        .db
319        .get_billing_customer_by_stripe_customer_id(&customer.id)
320        .await?
321    {
322        app.db
323            .update_billing_customer(
324                existing_customer.id,
325                &UpdateBillingCustomerParams {
326                    // For now we just leave the information as-is, as it is not
327                    // likely to change.
328                    ..Default::default()
329                },
330            )
331            .await?;
332    } else {
333        app.db
334            .create_billing_customer(&CreateBillingCustomerParams {
335                user_id: user.id,
336                stripe_customer_id: customer.id.to_string(),
337            })
338            .await?;
339    }
340
341    Ok(())
342}
343
344async fn sync_subscription(
345    app: &Arc<AppState>,
346    stripe_client: &Arc<dyn StripeClient>,
347    subscription: StripeSubscription,
348) -> anyhow::Result<billing_customer::Model> {
349    let subscription_kind = if let Some(stripe_billing) = &app.stripe_billing {
350        stripe_billing
351            .determine_subscription_kind(&subscription)
352            .await
353    } else {
354        None
355    };
356
357    let billing_customer =
358        find_or_create_billing_customer(app, stripe_client.as_ref(), &subscription.customer)
359            .await?
360            .context("billing customer not found")?;
361
362    if let Some(SubscriptionKind::ZedProTrial) = subscription_kind {
363        if subscription.status == SubscriptionStatus::Trialing {
364            let current_period_start =
365                DateTime::from_timestamp(subscription.current_period_start, 0)
366                    .context("No trial subscription period start")?;
367
368            app.db
369                .update_billing_customer(
370                    billing_customer.id,
371                    &UpdateBillingCustomerParams {
372                        trial_started_at: ActiveValue::set(Some(current_period_start.naive_utc())),
373                        ..Default::default()
374                    },
375                )
376                .await?;
377        }
378    }
379
380    let was_canceled_due_to_payment_failure = subscription.status == SubscriptionStatus::Canceled
381        && subscription
382            .cancellation_details
383            .as_ref()
384            .and_then(|details| details.reason)
385            .map_or(false, |reason| {
386                reason == StripeCancellationDetailsReason::PaymentFailed
387            });
388
389    if was_canceled_due_to_payment_failure {
390        app.db
391            .update_billing_customer(
392                billing_customer.id,
393                &UpdateBillingCustomerParams {
394                    has_overdue_invoices: ActiveValue::set(true),
395                    ..Default::default()
396                },
397            )
398            .await?;
399    }
400
401    if let Some(existing_subscription) = app
402        .db
403        .get_billing_subscription_by_stripe_subscription_id(subscription.id.0.as_ref())
404        .await?
405    {
406        app.db
407            .update_billing_subscription(
408                existing_subscription.id,
409                &UpdateBillingSubscriptionParams {
410                    billing_customer_id: ActiveValue::set(billing_customer.id),
411                    kind: ActiveValue::set(subscription_kind),
412                    stripe_subscription_id: ActiveValue::set(subscription.id.to_string()),
413                    stripe_subscription_status: ActiveValue::set(subscription.status.into()),
414                    stripe_cancel_at: ActiveValue::set(
415                        subscription
416                            .cancel_at
417                            .and_then(|cancel_at| DateTime::from_timestamp(cancel_at, 0))
418                            .map(|time| time.naive_utc()),
419                    ),
420                    stripe_cancellation_reason: ActiveValue::set(
421                        subscription
422                            .cancellation_details
423                            .and_then(|details| details.reason)
424                            .map(|reason| reason.into()),
425                    ),
426                    stripe_current_period_start: ActiveValue::set(Some(
427                        subscription.current_period_start,
428                    )),
429                    stripe_current_period_end: ActiveValue::set(Some(
430                        subscription.current_period_end,
431                    )),
432                },
433            )
434            .await?;
435    } else {
436        if let Some(existing_subscription) = app
437            .db
438            .get_active_billing_subscription(billing_customer.user_id)
439            .await?
440        {
441            if existing_subscription.kind == Some(SubscriptionKind::ZedFree)
442                && subscription_kind == Some(SubscriptionKind::ZedProTrial)
443            {
444                let stripe_subscription_id = StripeSubscriptionId(
445                    existing_subscription.stripe_subscription_id.clone().into(),
446                );
447
448                stripe_client
449                    .cancel_subscription(&stripe_subscription_id)
450                    .await?;
451            } else {
452                // If the user already has an active billing subscription, ignore the
453                // event and return an `Ok` to signal that it was processed
454                // successfully.
455                //
456                // There is the possibility that this could cause us to not create a
457                // subscription in the following scenario:
458                //
459                //   1. User has an active subscription A
460                //   2. User cancels subscription A
461                //   3. User creates a new subscription B
462                //   4. We process the new subscription B before the cancellation of subscription A
463                //   5. User ends up with no subscriptions
464                //
465                // In theory this situation shouldn't arise as we try to process the events in the order they occur.
466
467                log::info!(
468                    "user {user_id} already has an active subscription, skipping creation of subscription {subscription_id}",
469                    user_id = billing_customer.user_id,
470                    subscription_id = subscription.id
471                );
472                return Ok(billing_customer);
473            }
474        }
475
476        app.db
477            .create_billing_subscription(&CreateBillingSubscriptionParams {
478                billing_customer_id: billing_customer.id,
479                kind: subscription_kind,
480                stripe_subscription_id: subscription.id.to_string(),
481                stripe_subscription_status: subscription.status.into(),
482                stripe_cancellation_reason: subscription
483                    .cancellation_details
484                    .and_then(|details| details.reason)
485                    .map(|reason| reason.into()),
486                stripe_current_period_start: Some(subscription.current_period_start),
487                stripe_current_period_end: Some(subscription.current_period_end),
488            })
489            .await?;
490    }
491
492    if let Some(stripe_billing) = app.stripe_billing.as_ref() {
493        if subscription.status == SubscriptionStatus::Canceled
494            || subscription.status == SubscriptionStatus::Paused
495        {
496            let already_has_active_billing_subscription = app
497                .db
498                .has_active_billing_subscription(billing_customer.user_id)
499                .await?;
500            if !already_has_active_billing_subscription {
501                let stripe_customer_id =
502                    StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
503
504                stripe_billing
505                    .subscribe_to_zed_free(stripe_customer_id)
506                    .await?;
507            }
508        }
509    }
510
511    Ok(billing_customer)
512}
513
514async fn handle_customer_subscription_event(
515    app: &Arc<AppState>,
516    rpc_server: &Arc<Server>,
517    stripe_client: &Arc<dyn StripeClient>,
518    event: stripe::Event,
519) -> anyhow::Result<()> {
520    let EventObject::Subscription(subscription) = event.data.object else {
521        bail!("unexpected event payload for {}", event.id);
522    };
523
524    log::info!("handling Stripe {} event: {}", event.type_, event.id);
525
526    let billing_customer = sync_subscription(app, stripe_client, subscription.into()).await?;
527
528    // When the user's subscription changes, push down any changes to their plan.
529    rpc_server
530        .update_plan_for_user_legacy(billing_customer.user_id)
531        .await
532        .trace_err();
533
534    // When the user's subscription changes, we want to refresh their LLM tokens
535    // to either grant/revoke access.
536    rpc_server
537        .refresh_llm_tokens_for_user(billing_customer.user_id)
538        .await;
539
540    Ok(())
541}
542
543impl From<SubscriptionStatus> for StripeSubscriptionStatus {
544    fn from(value: SubscriptionStatus) -> Self {
545        match value {
546            SubscriptionStatus::Incomplete => Self::Incomplete,
547            SubscriptionStatus::IncompleteExpired => Self::IncompleteExpired,
548            SubscriptionStatus::Trialing => Self::Trialing,
549            SubscriptionStatus::Active => Self::Active,
550            SubscriptionStatus::PastDue => Self::PastDue,
551            SubscriptionStatus::Canceled => Self::Canceled,
552            SubscriptionStatus::Unpaid => Self::Unpaid,
553            SubscriptionStatus::Paused => Self::Paused,
554        }
555    }
556}
557
558impl From<CancellationDetailsReason> for StripeCancellationReason {
559    fn from(value: CancellationDetailsReason) -> Self {
560        match value {
561            CancellationDetailsReason::CancellationRequested => Self::CancellationRequested,
562            CancellationDetailsReason::PaymentDisputed => Self::PaymentDisputed,
563            CancellationDetailsReason::PaymentFailed => Self::PaymentFailed,
564        }
565    }
566}
567
568/// Finds or creates a billing customer using the provided customer.
569pub async fn find_or_create_billing_customer(
570    app: &Arc<AppState>,
571    stripe_client: &dyn StripeClient,
572    customer_id: &StripeCustomerId,
573) -> anyhow::Result<Option<billing_customer::Model>> {
574    // If we already have a billing customer record associated with the Stripe customer,
575    // there's nothing more we need to do.
576    if let Some(billing_customer) = app
577        .db
578        .get_billing_customer_by_stripe_customer_id(customer_id.0.as_ref())
579        .await?
580    {
581        return Ok(Some(billing_customer));
582    }
583
584    let customer = stripe_client.get_customer(customer_id).await?;
585
586    let Some(email) = customer.email else {
587        return Ok(None);
588    };
589
590    let Some(user) = app.db.get_user_by_email(&email).await? else {
591        return Ok(None);
592    };
593
594    let billing_customer = app
595        .db
596        .create_billing_customer(&CreateBillingCustomerParams {
597            user_id: user.id,
598            stripe_customer_id: customer.id.to_string(),
599        })
600        .await?;
601
602    Ok(Some(billing_customer))
603}
604
605const SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL: Duration = Duration::from_secs(60);
606
607pub fn sync_llm_request_usage_with_stripe_periodically(app: Arc<AppState>) {
608    let Some(stripe_billing) = app.stripe_billing.clone() else {
609        log::warn!("failed to retrieve Stripe billing object");
610        return;
611    };
612    let Some(llm_db) = app.llm_db.clone() else {
613        log::warn!("failed to retrieve LLM database");
614        return;
615    };
616
617    let executor = app.executor.clone();
618    executor.spawn_detached({
619        let executor = executor.clone();
620        async move {
621            loop {
622                sync_model_request_usage_with_stripe(&app, &llm_db, &stripe_billing)
623                    .await
624                    .context("failed to sync LLM request usage to Stripe")
625                    .trace_err();
626                executor
627                    .sleep(SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL)
628                    .await;
629            }
630        }
631    });
632}
633
634async fn sync_model_request_usage_with_stripe(
635    app: &Arc<AppState>,
636    llm_db: &Arc<LlmDatabase>,
637    stripe_billing: &Arc<StripeBilling>,
638) -> anyhow::Result<()> {
639    log::info!("Stripe usage sync: Starting");
640    let started_at = Utc::now();
641
642    let staff_users = app.db.get_staff_users().await?;
643    let staff_user_ids = staff_users
644        .iter()
645        .map(|user| user.id)
646        .collect::<HashSet<UserId>>();
647
648    let usage_meters = llm_db
649        .get_current_subscription_usage_meters(Utc::now())
650        .await?;
651    let mut usage_meters_by_user_id =
652        HashMap::<UserId, Vec<subscription_usage_meter::Model>>::default();
653    for (usage_meter, usage) in usage_meters {
654        let meters = usage_meters_by_user_id.entry(usage.user_id).or_default();
655        meters.push(usage_meter);
656    }
657
658    log::info!("Stripe usage sync: Retrieving Zed Pro subscriptions");
659    let get_zed_pro_subscriptions_started_at = Utc::now();
660    let billing_subscriptions = app.db.get_active_zed_pro_billing_subscriptions().await?;
661    log::info!(
662        "Stripe usage sync: Retrieved {} Zed Pro subscriptions in {}",
663        billing_subscriptions.len(),
664        Utc::now() - get_zed_pro_subscriptions_started_at
665    );
666
667    let claude_sonnet_4 = stripe_billing
668        .find_price_by_lookup_key("claude-sonnet-4-requests")
669        .await?;
670    let claude_sonnet_4_max = stripe_billing
671        .find_price_by_lookup_key("claude-sonnet-4-requests-max")
672        .await?;
673    let claude_opus_4 = stripe_billing
674        .find_price_by_lookup_key("claude-opus-4-requests")
675        .await?;
676    let claude_opus_4_max = stripe_billing
677        .find_price_by_lookup_key("claude-opus-4-requests-max")
678        .await?;
679    let claude_3_5_sonnet = stripe_billing
680        .find_price_by_lookup_key("claude-3-5-sonnet-requests")
681        .await?;
682    let claude_3_7_sonnet = stripe_billing
683        .find_price_by_lookup_key("claude-3-7-sonnet-requests")
684        .await?;
685    let claude_3_7_sonnet_max = stripe_billing
686        .find_price_by_lookup_key("claude-3-7-sonnet-requests-max")
687        .await?;
688
689    let model_mode_combinations = [
690        ("claude-opus-4", CompletionMode::Max),
691        ("claude-opus-4", CompletionMode::Normal),
692        ("claude-sonnet-4", CompletionMode::Max),
693        ("claude-sonnet-4", CompletionMode::Normal),
694        ("claude-3-7-sonnet", CompletionMode::Max),
695        ("claude-3-7-sonnet", CompletionMode::Normal),
696        ("claude-3-5-sonnet", CompletionMode::Normal),
697    ];
698
699    let billing_subscription_count = billing_subscriptions.len();
700
701    log::info!("Stripe usage sync: Syncing {billing_subscription_count} Zed Pro subscriptions");
702
703    for (user_id, (billing_customer, billing_subscription)) in billing_subscriptions {
704        maybe!(async {
705            if staff_user_ids.contains(&user_id) {
706                return anyhow::Ok(());
707            }
708
709            let stripe_customer_id =
710                StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
711            let stripe_subscription_id =
712                StripeSubscriptionId(billing_subscription.stripe_subscription_id.clone().into());
713
714            let usage_meters = usage_meters_by_user_id.get(&user_id);
715
716            for (model, mode) in &model_mode_combinations {
717                let Ok(model) =
718                    llm_db.model(LanguageModelProvider::Anthropic, model)
719                else {
720                    log::warn!("Failed to load model for user {user_id}: {model}");
721                    continue;
722                };
723
724                let (price, meter_event_name) = match model.name.as_str() {
725                    "claude-opus-4" => match mode {
726                        CompletionMode::Normal => (&claude_opus_4, "claude_opus_4/requests"),
727                        CompletionMode::Max => (&claude_opus_4_max, "claude_opus_4/requests/max"),
728                    },
729                    "claude-sonnet-4" => match mode {
730                        CompletionMode::Normal => (&claude_sonnet_4, "claude_sonnet_4/requests"),
731                        CompletionMode::Max => {
732                            (&claude_sonnet_4_max, "claude_sonnet_4/requests/max")
733                        }
734                    },
735                    "claude-3-5-sonnet" => (&claude_3_5_sonnet, "claude_3_5_sonnet/requests"),
736                    "claude-3-7-sonnet" => match mode {
737                        CompletionMode::Normal => {
738                            (&claude_3_7_sonnet, "claude_3_7_sonnet/requests")
739                        }
740                        CompletionMode::Max => {
741                            (&claude_3_7_sonnet_max, "claude_3_7_sonnet/requests/max")
742                        }
743                    },
744                    model_name => {
745                        bail!("Attempted to sync usage meter for unsupported model: {model_name:?}")
746                    }
747                };
748
749                let model_requests = usage_meters
750                    .and_then(|usage_meters| {
751                        usage_meters
752                            .iter()
753                            .find(|meter| meter.model_id == model.id && meter.mode == *mode)
754                    })
755                    .map(|usage_meter| usage_meter.requests)
756                    .unwrap_or(0);
757
758                if model_requests > 0 {
759                    stripe_billing
760                        .subscribe_to_price(&stripe_subscription_id, price)
761                        .await?;
762                }
763
764                stripe_billing
765                    .bill_model_request_usage(&stripe_customer_id, meter_event_name, model_requests)
766                    .await
767                    .with_context(|| {
768                        format!(
769                            "Failed to bill model request usage of {model_requests} for {stripe_customer_id}: {meter_event_name}",
770                        )
771                    })?;
772            }
773
774            Ok(())
775        })
776        .await
777        .log_err();
778    }
779
780    log::info!(
781        "Stripe usage sync: Synced {billing_subscription_count} Zed Pro subscriptions in {}",
782        Utc::now() - started_at
783    );
784
785    Ok(())
786}