1use anyhow::{Context as _, bail};
2use axum::{Extension, Json, Router, extract, routing::post};
3use chrono::{DateTime, Utc};
4use collections::{HashMap, HashSet};
5use reqwest::StatusCode;
6use sea_orm::ActiveValue;
7use serde::{Deserialize, Serialize};
8use std::{sync::Arc, time::Duration};
9use stripe::{CancellationDetailsReason, EventObject, EventType, ListEvents, SubscriptionStatus};
10use util::{ResultExt, maybe};
11use zed_llm_client::LanguageModelProvider;
12
13use crate::db::billing_subscription::{
14 StripeCancellationReason, StripeSubscriptionStatus, SubscriptionKind,
15};
16use crate::llm::db::subscription_usage_meter::{self, CompletionMode};
17use crate::rpc::{ResultExt as _, Server};
18use crate::stripe_client::{
19 StripeCancellationDetailsReason, StripeClient, StripeCustomerId, StripeSubscription,
20 StripeSubscriptionId,
21};
22use crate::{AppState, Error, Result};
23use crate::{db::UserId, llm::db::LlmDatabase};
24use crate::{
25 db::{
26 CreateBillingCustomerParams, CreateBillingSubscriptionParams,
27 CreateProcessedStripeEventParams, UpdateBillingCustomerParams,
28 UpdateBillingSubscriptionParams, billing_customer,
29 },
30 stripe_billing::StripeBilling,
31};
32
33pub fn router() -> Router {
34 Router::new().route(
35 "/billing/subscriptions/sync",
36 post(sync_billing_subscription),
37 )
38}
39
40#[derive(Debug, Deserialize)]
41struct SyncBillingSubscriptionBody {
42 github_user_id: i32,
43}
44
45#[derive(Debug, Serialize)]
46struct SyncBillingSubscriptionResponse {
47 stripe_customer_id: String,
48}
49
50async fn sync_billing_subscription(
51 Extension(app): Extension<Arc<AppState>>,
52 extract::Json(body): extract::Json<SyncBillingSubscriptionBody>,
53) -> Result<Json<SyncBillingSubscriptionResponse>> {
54 let Some(stripe_client) = app.stripe_client.clone() else {
55 log::error!("failed to retrieve Stripe client");
56 Err(Error::http(
57 StatusCode::NOT_IMPLEMENTED,
58 "not supported".into(),
59 ))?
60 };
61
62 let user = app
63 .db
64 .get_user_by_github_user_id(body.github_user_id)
65 .await?
66 .context("user not found")?;
67
68 let billing_customer = app
69 .db
70 .get_billing_customer_by_user_id(user.id)
71 .await?
72 .context("billing customer not found")?;
73 let stripe_customer_id = StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
74
75 let subscriptions = stripe_client
76 .list_subscriptions_for_customer(&stripe_customer_id)
77 .await?;
78
79 for subscription in subscriptions {
80 let subscription_id = subscription.id.clone();
81
82 sync_subscription(&app, &stripe_client, subscription)
83 .await
84 .with_context(|| {
85 format!(
86 "failed to sync subscription {subscription_id} for user {}",
87 user.id,
88 )
89 })?;
90 }
91
92 Ok(Json(SyncBillingSubscriptionResponse {
93 stripe_customer_id: billing_customer.stripe_customer_id.clone(),
94 }))
95}
96
97/// The amount of time we wait in between each poll of Stripe events.
98///
99/// This value should strike a balance between:
100/// 1. Being short enough that we update quickly when something in Stripe changes
101/// 2. Being long enough that we don't eat into our rate limits.
102///
103/// As a point of reference, the Sequin folks say they have this at **500ms**:
104///
105/// > We poll the Stripe /events endpoint every 500ms per account
106/// >
107/// > — https://blog.sequinstream.com/events-not-webhooks/
108const POLL_EVENTS_INTERVAL: Duration = Duration::from_secs(5);
109
110/// The maximum number of events to return per page.
111///
112/// We set this to 100 (the max) so we have to make fewer requests to Stripe.
113///
114/// > Limit can range between 1 and 100, and the default is 10.
115const EVENTS_LIMIT_PER_PAGE: u64 = 100;
116
117/// The number of pages consisting entirely of already-processed events that we
118/// will see before we stop retrieving events.
119///
120/// This is used to prevent over-fetching the Stripe events API for events we've
121/// already seen and processed.
122const NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP: usize = 4;
123
124/// Polls the Stripe events API periodically to reconcile the records in our
125/// database with the data in Stripe.
126pub fn poll_stripe_events_periodically(app: Arc<AppState>, rpc_server: Arc<Server>) {
127 let Some(real_stripe_client) = app.real_stripe_client.clone() else {
128 log::warn!("failed to retrieve Stripe client");
129 return;
130 };
131 let Some(stripe_client) = app.stripe_client.clone() else {
132 log::warn!("failed to retrieve Stripe client");
133 return;
134 };
135
136 let executor = app.executor.clone();
137 executor.spawn_detached({
138 let executor = executor.clone();
139 async move {
140 loop {
141 poll_stripe_events(&app, &rpc_server, &stripe_client, &real_stripe_client)
142 .await
143 .log_err();
144
145 executor.sleep(POLL_EVENTS_INTERVAL).await;
146 }
147 }
148 });
149}
150
151async fn poll_stripe_events(
152 app: &Arc<AppState>,
153 rpc_server: &Arc<Server>,
154 stripe_client: &Arc<dyn StripeClient>,
155 real_stripe_client: &stripe::Client,
156) -> anyhow::Result<()> {
157 fn event_type_to_string(event_type: EventType) -> String {
158 // Calling `to_string` on `stripe::EventType` members gives us a quoted string,
159 // so we need to unquote it.
160 event_type.to_string().trim_matches('"').to_string()
161 }
162
163 let event_types = [
164 EventType::CustomerCreated,
165 EventType::CustomerUpdated,
166 EventType::CustomerSubscriptionCreated,
167 EventType::CustomerSubscriptionUpdated,
168 EventType::CustomerSubscriptionPaused,
169 EventType::CustomerSubscriptionResumed,
170 EventType::CustomerSubscriptionDeleted,
171 ]
172 .into_iter()
173 .map(event_type_to_string)
174 .collect::<Vec<_>>();
175
176 let mut pages_of_already_processed_events = 0;
177 let mut unprocessed_events = Vec::new();
178
179 log::info!(
180 "Stripe events: starting retrieval for {}",
181 event_types.join(", ")
182 );
183 let mut params = ListEvents::new();
184 params.types = Some(event_types.clone());
185 params.limit = Some(EVENTS_LIMIT_PER_PAGE);
186
187 let mut event_pages = stripe::Event::list(&real_stripe_client, ¶ms)
188 .await?
189 .paginate(params);
190
191 loop {
192 let processed_event_ids = {
193 let event_ids = event_pages
194 .page
195 .data
196 .iter()
197 .map(|event| event.id.as_str())
198 .collect::<Vec<_>>();
199 app.db
200 .get_processed_stripe_events_by_event_ids(&event_ids)
201 .await?
202 .into_iter()
203 .map(|event| event.stripe_event_id)
204 .collect::<Vec<_>>()
205 };
206
207 let mut processed_events_in_page = 0;
208 let events_in_page = event_pages.page.data.len();
209 for event in &event_pages.page.data {
210 if processed_event_ids.contains(&event.id.to_string()) {
211 processed_events_in_page += 1;
212 log::debug!("Stripe events: already processed '{}', skipping", event.id);
213 } else {
214 unprocessed_events.push(event.clone());
215 }
216 }
217
218 if processed_events_in_page == events_in_page {
219 pages_of_already_processed_events += 1;
220 }
221
222 if event_pages.page.has_more {
223 if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP
224 {
225 log::info!(
226 "Stripe events: stopping, saw {pages_of_already_processed_events} pages of already-processed events"
227 );
228 break;
229 } else {
230 log::info!("Stripe events: retrieving next page");
231 event_pages = event_pages.next(&real_stripe_client).await?;
232 }
233 } else {
234 break;
235 }
236 }
237
238 log::info!("Stripe events: unprocessed {}", unprocessed_events.len());
239
240 // Sort all of the unprocessed events in ascending order, so we can handle them in the order they occurred.
241 unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id)));
242
243 for event in unprocessed_events {
244 let event_id = event.id.clone();
245 let processed_event_params = CreateProcessedStripeEventParams {
246 stripe_event_id: event.id.to_string(),
247 stripe_event_type: event_type_to_string(event.type_),
248 stripe_event_created_timestamp: event.created,
249 };
250
251 // If the event has happened too far in the past, we don't want to
252 // process it and risk overwriting other more-recent updates.
253 //
254 // 1 day was chosen arbitrarily. This could be made longer or shorter.
255 let one_day = Duration::from_secs(24 * 60 * 60);
256 let a_day_ago = Utc::now() - one_day;
257 if a_day_ago.timestamp() > event.created {
258 log::info!(
259 "Stripe events: event '{}' is more than {one_day:?} old, marking as processed",
260 event_id
261 );
262 app.db
263 .create_processed_stripe_event(&processed_event_params)
264 .await?;
265
266 continue;
267 }
268
269 let process_result = match event.type_ {
270 EventType::CustomerCreated | EventType::CustomerUpdated => {
271 handle_customer_event(app, real_stripe_client, event).await
272 }
273 EventType::CustomerSubscriptionCreated
274 | EventType::CustomerSubscriptionUpdated
275 | EventType::CustomerSubscriptionPaused
276 | EventType::CustomerSubscriptionResumed
277 | EventType::CustomerSubscriptionDeleted => {
278 handle_customer_subscription_event(app, rpc_server, stripe_client, event).await
279 }
280 _ => Ok(()),
281 };
282
283 if let Some(()) = process_result
284 .with_context(|| format!("failed to process event {event_id} successfully"))
285 .log_err()
286 {
287 app.db
288 .create_processed_stripe_event(&processed_event_params)
289 .await?;
290 }
291 }
292
293 Ok(())
294}
295
296async fn handle_customer_event(
297 app: &Arc<AppState>,
298 _stripe_client: &stripe::Client,
299 event: stripe::Event,
300) -> anyhow::Result<()> {
301 let EventObject::Customer(customer) = event.data.object else {
302 bail!("unexpected event payload for {}", event.id);
303 };
304
305 log::info!("handling Stripe {} event: {}", event.type_, event.id);
306
307 let Some(email) = customer.email else {
308 log::info!("Stripe customer has no email: skipping");
309 return Ok(());
310 };
311
312 let Some(user) = app.db.get_user_by_email(&email).await? else {
313 log::info!("no user found for email: skipping");
314 return Ok(());
315 };
316
317 if let Some(existing_customer) = app
318 .db
319 .get_billing_customer_by_stripe_customer_id(&customer.id)
320 .await?
321 {
322 app.db
323 .update_billing_customer(
324 existing_customer.id,
325 &UpdateBillingCustomerParams {
326 // For now we just leave the information as-is, as it is not
327 // likely to change.
328 ..Default::default()
329 },
330 )
331 .await?;
332 } else {
333 app.db
334 .create_billing_customer(&CreateBillingCustomerParams {
335 user_id: user.id,
336 stripe_customer_id: customer.id.to_string(),
337 })
338 .await?;
339 }
340
341 Ok(())
342}
343
344async fn sync_subscription(
345 app: &Arc<AppState>,
346 stripe_client: &Arc<dyn StripeClient>,
347 subscription: StripeSubscription,
348) -> anyhow::Result<billing_customer::Model> {
349 let subscription_kind = if let Some(stripe_billing) = &app.stripe_billing {
350 stripe_billing
351 .determine_subscription_kind(&subscription)
352 .await
353 } else {
354 None
355 };
356
357 let billing_customer =
358 find_or_create_billing_customer(app, stripe_client.as_ref(), &subscription.customer)
359 .await?
360 .context("billing customer not found")?;
361
362 if let Some(SubscriptionKind::ZedProTrial) = subscription_kind {
363 if subscription.status == SubscriptionStatus::Trialing {
364 let current_period_start =
365 DateTime::from_timestamp(subscription.current_period_start, 0)
366 .context("No trial subscription period start")?;
367
368 app.db
369 .update_billing_customer(
370 billing_customer.id,
371 &UpdateBillingCustomerParams {
372 trial_started_at: ActiveValue::set(Some(current_period_start.naive_utc())),
373 ..Default::default()
374 },
375 )
376 .await?;
377 }
378 }
379
380 let was_canceled_due_to_payment_failure = subscription.status == SubscriptionStatus::Canceled
381 && subscription
382 .cancellation_details
383 .as_ref()
384 .and_then(|details| details.reason)
385 .map_or(false, |reason| {
386 reason == StripeCancellationDetailsReason::PaymentFailed
387 });
388
389 if was_canceled_due_to_payment_failure {
390 app.db
391 .update_billing_customer(
392 billing_customer.id,
393 &UpdateBillingCustomerParams {
394 has_overdue_invoices: ActiveValue::set(true),
395 ..Default::default()
396 },
397 )
398 .await?;
399 }
400
401 if let Some(existing_subscription) = app
402 .db
403 .get_billing_subscription_by_stripe_subscription_id(subscription.id.0.as_ref())
404 .await?
405 {
406 app.db
407 .update_billing_subscription(
408 existing_subscription.id,
409 &UpdateBillingSubscriptionParams {
410 billing_customer_id: ActiveValue::set(billing_customer.id),
411 kind: ActiveValue::set(subscription_kind),
412 stripe_subscription_id: ActiveValue::set(subscription.id.to_string()),
413 stripe_subscription_status: ActiveValue::set(subscription.status.into()),
414 stripe_cancel_at: ActiveValue::set(
415 subscription
416 .cancel_at
417 .and_then(|cancel_at| DateTime::from_timestamp(cancel_at, 0))
418 .map(|time| time.naive_utc()),
419 ),
420 stripe_cancellation_reason: ActiveValue::set(
421 subscription
422 .cancellation_details
423 .and_then(|details| details.reason)
424 .map(|reason| reason.into()),
425 ),
426 stripe_current_period_start: ActiveValue::set(Some(
427 subscription.current_period_start,
428 )),
429 stripe_current_period_end: ActiveValue::set(Some(
430 subscription.current_period_end,
431 )),
432 },
433 )
434 .await?;
435 } else {
436 if let Some(existing_subscription) = app
437 .db
438 .get_active_billing_subscription(billing_customer.user_id)
439 .await?
440 {
441 if existing_subscription.kind == Some(SubscriptionKind::ZedFree)
442 && subscription_kind == Some(SubscriptionKind::ZedProTrial)
443 {
444 let stripe_subscription_id = StripeSubscriptionId(
445 existing_subscription.stripe_subscription_id.clone().into(),
446 );
447
448 stripe_client
449 .cancel_subscription(&stripe_subscription_id)
450 .await?;
451 } else {
452 // If the user already has an active billing subscription, ignore the
453 // event and return an `Ok` to signal that it was processed
454 // successfully.
455 //
456 // There is the possibility that this could cause us to not create a
457 // subscription in the following scenario:
458 //
459 // 1. User has an active subscription A
460 // 2. User cancels subscription A
461 // 3. User creates a new subscription B
462 // 4. We process the new subscription B before the cancellation of subscription A
463 // 5. User ends up with no subscriptions
464 //
465 // In theory this situation shouldn't arise as we try to process the events in the order they occur.
466
467 log::info!(
468 "user {user_id} already has an active subscription, skipping creation of subscription {subscription_id}",
469 user_id = billing_customer.user_id,
470 subscription_id = subscription.id
471 );
472 return Ok(billing_customer);
473 }
474 }
475
476 app.db
477 .create_billing_subscription(&CreateBillingSubscriptionParams {
478 billing_customer_id: billing_customer.id,
479 kind: subscription_kind,
480 stripe_subscription_id: subscription.id.to_string(),
481 stripe_subscription_status: subscription.status.into(),
482 stripe_cancellation_reason: subscription
483 .cancellation_details
484 .and_then(|details| details.reason)
485 .map(|reason| reason.into()),
486 stripe_current_period_start: Some(subscription.current_period_start),
487 stripe_current_period_end: Some(subscription.current_period_end),
488 })
489 .await?;
490 }
491
492 if let Some(stripe_billing) = app.stripe_billing.as_ref() {
493 if subscription.status == SubscriptionStatus::Canceled
494 || subscription.status == SubscriptionStatus::Paused
495 {
496 let already_has_active_billing_subscription = app
497 .db
498 .has_active_billing_subscription(billing_customer.user_id)
499 .await?;
500 if !already_has_active_billing_subscription {
501 let stripe_customer_id =
502 StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
503
504 stripe_billing
505 .subscribe_to_zed_free(stripe_customer_id)
506 .await?;
507 }
508 }
509 }
510
511 Ok(billing_customer)
512}
513
514async fn handle_customer_subscription_event(
515 app: &Arc<AppState>,
516 rpc_server: &Arc<Server>,
517 stripe_client: &Arc<dyn StripeClient>,
518 event: stripe::Event,
519) -> anyhow::Result<()> {
520 let EventObject::Subscription(subscription) = event.data.object else {
521 bail!("unexpected event payload for {}", event.id);
522 };
523
524 log::info!("handling Stripe {} event: {}", event.type_, event.id);
525
526 let billing_customer = sync_subscription(app, stripe_client, subscription.into()).await?;
527
528 // When the user's subscription changes, push down any changes to their plan.
529 rpc_server
530 .update_plan_for_user_legacy(billing_customer.user_id)
531 .await
532 .trace_err();
533
534 // When the user's subscription changes, we want to refresh their LLM tokens
535 // to either grant/revoke access.
536 rpc_server
537 .refresh_llm_tokens_for_user(billing_customer.user_id)
538 .await;
539
540 Ok(())
541}
542
543impl From<SubscriptionStatus> for StripeSubscriptionStatus {
544 fn from(value: SubscriptionStatus) -> Self {
545 match value {
546 SubscriptionStatus::Incomplete => Self::Incomplete,
547 SubscriptionStatus::IncompleteExpired => Self::IncompleteExpired,
548 SubscriptionStatus::Trialing => Self::Trialing,
549 SubscriptionStatus::Active => Self::Active,
550 SubscriptionStatus::PastDue => Self::PastDue,
551 SubscriptionStatus::Canceled => Self::Canceled,
552 SubscriptionStatus::Unpaid => Self::Unpaid,
553 SubscriptionStatus::Paused => Self::Paused,
554 }
555 }
556}
557
558impl From<CancellationDetailsReason> for StripeCancellationReason {
559 fn from(value: CancellationDetailsReason) -> Self {
560 match value {
561 CancellationDetailsReason::CancellationRequested => Self::CancellationRequested,
562 CancellationDetailsReason::PaymentDisputed => Self::PaymentDisputed,
563 CancellationDetailsReason::PaymentFailed => Self::PaymentFailed,
564 }
565 }
566}
567
568/// Finds or creates a billing customer using the provided customer.
569pub async fn find_or_create_billing_customer(
570 app: &Arc<AppState>,
571 stripe_client: &dyn StripeClient,
572 customer_id: &StripeCustomerId,
573) -> anyhow::Result<Option<billing_customer::Model>> {
574 // If we already have a billing customer record associated with the Stripe customer,
575 // there's nothing more we need to do.
576 if let Some(billing_customer) = app
577 .db
578 .get_billing_customer_by_stripe_customer_id(customer_id.0.as_ref())
579 .await?
580 {
581 return Ok(Some(billing_customer));
582 }
583
584 let customer = stripe_client.get_customer(customer_id).await?;
585
586 let Some(email) = customer.email else {
587 return Ok(None);
588 };
589
590 let Some(user) = app.db.get_user_by_email(&email).await? else {
591 return Ok(None);
592 };
593
594 let billing_customer = app
595 .db
596 .create_billing_customer(&CreateBillingCustomerParams {
597 user_id: user.id,
598 stripe_customer_id: customer.id.to_string(),
599 })
600 .await?;
601
602 Ok(Some(billing_customer))
603}
604
605const SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL: Duration = Duration::from_secs(60);
606
607pub fn sync_llm_request_usage_with_stripe_periodically(app: Arc<AppState>) {
608 let Some(stripe_billing) = app.stripe_billing.clone() else {
609 log::warn!("failed to retrieve Stripe billing object");
610 return;
611 };
612 let Some(llm_db) = app.llm_db.clone() else {
613 log::warn!("failed to retrieve LLM database");
614 return;
615 };
616
617 let executor = app.executor.clone();
618 executor.spawn_detached({
619 let executor = executor.clone();
620 async move {
621 loop {
622 sync_model_request_usage_with_stripe(&app, &llm_db, &stripe_billing)
623 .await
624 .context("failed to sync LLM request usage to Stripe")
625 .trace_err();
626 executor
627 .sleep(SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL)
628 .await;
629 }
630 }
631 });
632}
633
634async fn sync_model_request_usage_with_stripe(
635 app: &Arc<AppState>,
636 llm_db: &Arc<LlmDatabase>,
637 stripe_billing: &Arc<StripeBilling>,
638) -> anyhow::Result<()> {
639 log::info!("Stripe usage sync: Starting");
640 let started_at = Utc::now();
641
642 let staff_users = app.db.get_staff_users().await?;
643 let staff_user_ids = staff_users
644 .iter()
645 .map(|user| user.id)
646 .collect::<HashSet<UserId>>();
647
648 let usage_meters = llm_db
649 .get_current_subscription_usage_meters(Utc::now())
650 .await?;
651 let mut usage_meters_by_user_id =
652 HashMap::<UserId, Vec<subscription_usage_meter::Model>>::default();
653 for (usage_meter, usage) in usage_meters {
654 let meters = usage_meters_by_user_id.entry(usage.user_id).or_default();
655 meters.push(usage_meter);
656 }
657
658 log::info!("Stripe usage sync: Retrieving Zed Pro subscriptions");
659 let get_zed_pro_subscriptions_started_at = Utc::now();
660 let billing_subscriptions = app.db.get_active_zed_pro_billing_subscriptions().await?;
661 log::info!(
662 "Stripe usage sync: Retrieved {} Zed Pro subscriptions in {}",
663 billing_subscriptions.len(),
664 Utc::now() - get_zed_pro_subscriptions_started_at
665 );
666
667 let claude_sonnet_4 = stripe_billing
668 .find_price_by_lookup_key("claude-sonnet-4-requests")
669 .await?;
670 let claude_sonnet_4_max = stripe_billing
671 .find_price_by_lookup_key("claude-sonnet-4-requests-max")
672 .await?;
673 let claude_opus_4 = stripe_billing
674 .find_price_by_lookup_key("claude-opus-4-requests")
675 .await?;
676 let claude_opus_4_max = stripe_billing
677 .find_price_by_lookup_key("claude-opus-4-requests-max")
678 .await?;
679 let claude_3_5_sonnet = stripe_billing
680 .find_price_by_lookup_key("claude-3-5-sonnet-requests")
681 .await?;
682 let claude_3_7_sonnet = stripe_billing
683 .find_price_by_lookup_key("claude-3-7-sonnet-requests")
684 .await?;
685 let claude_3_7_sonnet_max = stripe_billing
686 .find_price_by_lookup_key("claude-3-7-sonnet-requests-max")
687 .await?;
688
689 let model_mode_combinations = [
690 ("claude-opus-4", CompletionMode::Max),
691 ("claude-opus-4", CompletionMode::Normal),
692 ("claude-sonnet-4", CompletionMode::Max),
693 ("claude-sonnet-4", CompletionMode::Normal),
694 ("claude-3-7-sonnet", CompletionMode::Max),
695 ("claude-3-7-sonnet", CompletionMode::Normal),
696 ("claude-3-5-sonnet", CompletionMode::Normal),
697 ];
698
699 let billing_subscription_count = billing_subscriptions.len();
700
701 log::info!("Stripe usage sync: Syncing {billing_subscription_count} Zed Pro subscriptions");
702
703 for (user_id, (billing_customer, billing_subscription)) in billing_subscriptions {
704 maybe!(async {
705 if staff_user_ids.contains(&user_id) {
706 return anyhow::Ok(());
707 }
708
709 let stripe_customer_id =
710 StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
711 let stripe_subscription_id =
712 StripeSubscriptionId(billing_subscription.stripe_subscription_id.clone().into());
713
714 let usage_meters = usage_meters_by_user_id.get(&user_id);
715
716 for (model, mode) in &model_mode_combinations {
717 let Ok(model) =
718 llm_db.model(LanguageModelProvider::Anthropic, model)
719 else {
720 log::warn!("Failed to load model for user {user_id}: {model}");
721 continue;
722 };
723
724 let (price, meter_event_name) = match model.name.as_str() {
725 "claude-opus-4" => match mode {
726 CompletionMode::Normal => (&claude_opus_4, "claude_opus_4/requests"),
727 CompletionMode::Max => (&claude_opus_4_max, "claude_opus_4/requests/max"),
728 },
729 "claude-sonnet-4" => match mode {
730 CompletionMode::Normal => (&claude_sonnet_4, "claude_sonnet_4/requests"),
731 CompletionMode::Max => {
732 (&claude_sonnet_4_max, "claude_sonnet_4/requests/max")
733 }
734 },
735 "claude-3-5-sonnet" => (&claude_3_5_sonnet, "claude_3_5_sonnet/requests"),
736 "claude-3-7-sonnet" => match mode {
737 CompletionMode::Normal => {
738 (&claude_3_7_sonnet, "claude_3_7_sonnet/requests")
739 }
740 CompletionMode::Max => {
741 (&claude_3_7_sonnet_max, "claude_3_7_sonnet/requests/max")
742 }
743 },
744 model_name => {
745 bail!("Attempted to sync usage meter for unsupported model: {model_name:?}")
746 }
747 };
748
749 let model_requests = usage_meters
750 .and_then(|usage_meters| {
751 usage_meters
752 .iter()
753 .find(|meter| meter.model_id == model.id && meter.mode == *mode)
754 })
755 .map(|usage_meter| usage_meter.requests)
756 .unwrap_or(0);
757
758 if model_requests > 0 {
759 stripe_billing
760 .subscribe_to_price(&stripe_subscription_id, price)
761 .await?;
762 }
763
764 stripe_billing
765 .bill_model_request_usage(&stripe_customer_id, meter_event_name, model_requests)
766 .await
767 .with_context(|| {
768 format!(
769 "Failed to bill model request usage of {model_requests} for {stripe_customer_id}: {meter_event_name}",
770 )
771 })?;
772 }
773
774 Ok(())
775 })
776 .await
777 .log_err();
778 }
779
780 log::info!(
781 "Stripe usage sync: Synced {billing_subscription_count} Zed Pro subscriptions in {}",
782 Utc::now() - started_at
783 );
784
785 Ok(())
786}