1use anyhow::{Context as _, bail};
2use axum::routing::put;
3use axum::{
4 Extension, Json, Router,
5 extract::{self, Query},
6 routing::{get, post},
7};
8use chrono::{DateTime, SecondsFormat, Utc};
9use collections::{HashMap, HashSet};
10use reqwest::StatusCode;
11use sea_orm::ActiveValue;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use std::{str::FromStr, sync::Arc, time::Duration};
15use stripe::{
16 BillingPortalSession, CancellationDetailsReason, CreateBillingPortalSession,
17 CreateBillingPortalSessionFlowData, CreateBillingPortalSessionFlowDataAfterCompletion,
18 CreateBillingPortalSessionFlowDataAfterCompletionRedirect,
19 CreateBillingPortalSessionFlowDataSubscriptionUpdateConfirm,
20 CreateBillingPortalSessionFlowDataSubscriptionUpdateConfirmItems,
21 CreateBillingPortalSessionFlowDataType, CustomerId, EventObject, EventType, ListEvents,
22 PaymentMethod, Subscription, SubscriptionId, SubscriptionStatus,
23};
24use util::{ResultExt, maybe};
25use zed_llm_client::LanguageModelProvider;
26
27use crate::api::events::SnowflakeRow;
28use crate::db::billing_subscription::{
29 StripeCancellationReason, StripeSubscriptionStatus, SubscriptionKind,
30};
31use crate::llm::AGENT_EXTENDED_TRIAL_FEATURE_FLAG;
32use crate::llm::db::subscription_usage_meter::{self, CompletionMode};
33use crate::rpc::{ResultExt as _, Server};
34use crate::stripe_client::{
35 StripeCancellationDetailsReason, StripeClient, StripeCustomerId, StripeSubscription,
36 StripeSubscriptionId, UpdateCustomerParams,
37};
38use crate::{AppState, Error, Result};
39use crate::{db::UserId, llm::db::LlmDatabase};
40use crate::{
41 db::{
42 BillingSubscriptionId, CreateBillingCustomerParams, CreateBillingSubscriptionParams,
43 CreateProcessedStripeEventParams, UpdateBillingCustomerParams,
44 UpdateBillingPreferencesParams, UpdateBillingSubscriptionParams, billing_customer,
45 },
46 stripe_billing::StripeBilling,
47};
48
49pub fn router() -> Router {
50 Router::new()
51 .route("/billing/preferences", put(update_billing_preferences))
52 .route("/billing/subscriptions", post(create_billing_subscription))
53 .route(
54 "/billing/subscriptions/manage",
55 post(manage_billing_subscription),
56 )
57 .route(
58 "/billing/subscriptions/sync",
59 post(sync_billing_subscription),
60 )
61 .route("/billing/usage", get(get_current_usage))
62}
63
64#[derive(Debug, Serialize)]
65struct BillingPreferencesResponse {
66 trial_started_at: Option<String>,
67 max_monthly_llm_usage_spending_in_cents: i32,
68 model_request_overages_enabled: bool,
69 model_request_overages_spend_limit_in_cents: i32,
70}
71
72#[derive(Debug, Deserialize)]
73struct UpdateBillingPreferencesBody {
74 github_user_id: i32,
75 #[serde(default)]
76 max_monthly_llm_usage_spending_in_cents: i32,
77 #[serde(default)]
78 model_request_overages_enabled: bool,
79 #[serde(default)]
80 model_request_overages_spend_limit_in_cents: i32,
81}
82
83async fn update_billing_preferences(
84 Extension(app): Extension<Arc<AppState>>,
85 Extension(rpc_server): Extension<Arc<crate::rpc::Server>>,
86 extract::Json(body): extract::Json<UpdateBillingPreferencesBody>,
87) -> Result<Json<BillingPreferencesResponse>> {
88 let user = app
89 .db
90 .get_user_by_github_user_id(body.github_user_id)
91 .await?
92 .context("user not found")?;
93
94 let billing_customer = app.db.get_billing_customer_by_user_id(user.id).await?;
95
96 let max_monthly_llm_usage_spending_in_cents =
97 body.max_monthly_llm_usage_spending_in_cents.max(0);
98 let model_request_overages_spend_limit_in_cents =
99 body.model_request_overages_spend_limit_in_cents.max(0);
100
101 let billing_preferences =
102 if let Some(_billing_preferences) = app.db.get_billing_preferences(user.id).await? {
103 app.db
104 .update_billing_preferences(
105 user.id,
106 &UpdateBillingPreferencesParams {
107 max_monthly_llm_usage_spending_in_cents: ActiveValue::set(
108 max_monthly_llm_usage_spending_in_cents,
109 ),
110 model_request_overages_enabled: ActiveValue::set(
111 body.model_request_overages_enabled,
112 ),
113 model_request_overages_spend_limit_in_cents: ActiveValue::set(
114 model_request_overages_spend_limit_in_cents,
115 ),
116 },
117 )
118 .await?
119 } else {
120 app.db
121 .create_billing_preferences(
122 user.id,
123 &crate::db::CreateBillingPreferencesParams {
124 max_monthly_llm_usage_spending_in_cents,
125 model_request_overages_enabled: body.model_request_overages_enabled,
126 model_request_overages_spend_limit_in_cents,
127 },
128 )
129 .await?
130 };
131
132 SnowflakeRow::new(
133 "Billing Preferences Updated",
134 Some(user.metrics_id),
135 user.admin,
136 None,
137 json!({
138 "user_id": user.id,
139 "model_request_overages_enabled": billing_preferences.model_request_overages_enabled,
140 "model_request_overages_spend_limit_in_cents": billing_preferences.model_request_overages_spend_limit_in_cents,
141 "max_monthly_llm_usage_spending_in_cents": billing_preferences.max_monthly_llm_usage_spending_in_cents,
142 }),
143 )
144 .write(&app.kinesis_client, &app.config.kinesis_stream)
145 .await
146 .log_err();
147
148 rpc_server.refresh_llm_tokens_for_user(user.id).await;
149
150 Ok(Json(BillingPreferencesResponse {
151 trial_started_at: billing_customer
152 .and_then(|billing_customer| billing_customer.trial_started_at)
153 .map(|trial_started_at| {
154 trial_started_at
155 .and_utc()
156 .to_rfc3339_opts(SecondsFormat::Millis, true)
157 }),
158 max_monthly_llm_usage_spending_in_cents: billing_preferences
159 .max_monthly_llm_usage_spending_in_cents,
160 model_request_overages_enabled: billing_preferences.model_request_overages_enabled,
161 model_request_overages_spend_limit_in_cents: billing_preferences
162 .model_request_overages_spend_limit_in_cents,
163 }))
164}
165
166#[derive(Debug, PartialEq, Clone, Copy, Deserialize)]
167#[serde(rename_all = "snake_case")]
168enum ProductCode {
169 ZedPro,
170 ZedProTrial,
171}
172
173#[derive(Debug, Deserialize)]
174struct CreateBillingSubscriptionBody {
175 github_user_id: i32,
176 product: ProductCode,
177}
178
179#[derive(Debug, Serialize)]
180struct CreateBillingSubscriptionResponse {
181 checkout_session_url: String,
182}
183
184/// Initiates a Stripe Checkout session for creating a billing subscription.
185async fn create_billing_subscription(
186 Extension(app): Extension<Arc<AppState>>,
187 extract::Json(body): extract::Json<CreateBillingSubscriptionBody>,
188) -> Result<Json<CreateBillingSubscriptionResponse>> {
189 let user = app
190 .db
191 .get_user_by_github_user_id(body.github_user_id)
192 .await?
193 .context("user not found")?;
194
195 let Some(stripe_billing) = app.stripe_billing.clone() else {
196 log::error!("failed to retrieve Stripe billing object");
197 Err(Error::http(
198 StatusCode::NOT_IMPLEMENTED,
199 "not supported".into(),
200 ))?
201 };
202
203 if let Some(existing_subscription) = app.db.get_active_billing_subscription(user.id).await? {
204 let is_checkout_allowed = body.product == ProductCode::ZedProTrial
205 && existing_subscription.kind == Some(SubscriptionKind::ZedFree);
206
207 if !is_checkout_allowed {
208 return Err(Error::http(
209 StatusCode::CONFLICT,
210 "user already has an active subscription".into(),
211 ));
212 }
213 }
214
215 let existing_billing_customer = app.db.get_billing_customer_by_user_id(user.id).await?;
216 if let Some(existing_billing_customer) = &existing_billing_customer {
217 if existing_billing_customer.has_overdue_invoices {
218 return Err(Error::http(
219 StatusCode::PAYMENT_REQUIRED,
220 "user has overdue invoices".into(),
221 ));
222 }
223 }
224
225 let customer_id = if let Some(existing_customer) = &existing_billing_customer {
226 let customer_id = StripeCustomerId(existing_customer.stripe_customer_id.clone().into());
227 if let Some(email) = user.email_address.as_deref() {
228 stripe_billing
229 .client()
230 .update_customer(&customer_id, UpdateCustomerParams { email: Some(email) })
231 .await
232 // Update of email address is best-effort - continue checkout even if it fails
233 .context("error updating stripe customer email address")
234 .log_err();
235 }
236 customer_id
237 } else {
238 stripe_billing
239 .find_or_create_customer_by_email(user.email_address.as_deref())
240 .await?
241 };
242
243 let success_url = format!(
244 "{}/account?checkout_complete=1",
245 app.config.zed_dot_dev_url()
246 );
247
248 let checkout_session_url = match body.product {
249 ProductCode::ZedPro => {
250 stripe_billing
251 .checkout_with_zed_pro(&customer_id, &user.github_login, &success_url)
252 .await?
253 }
254 ProductCode::ZedProTrial => {
255 if let Some(existing_billing_customer) = &existing_billing_customer {
256 if existing_billing_customer.trial_started_at.is_some() {
257 return Err(Error::http(
258 StatusCode::FORBIDDEN,
259 "user already used free trial".into(),
260 ));
261 }
262 }
263
264 let feature_flags = app.db.get_user_flags(user.id).await?;
265
266 stripe_billing
267 .checkout_with_zed_pro_trial(
268 &customer_id,
269 &user.github_login,
270 feature_flags,
271 &success_url,
272 )
273 .await?
274 }
275 };
276
277 Ok(Json(CreateBillingSubscriptionResponse {
278 checkout_session_url,
279 }))
280}
281
282#[derive(Debug, PartialEq, Deserialize)]
283#[serde(rename_all = "snake_case")]
284enum ManageSubscriptionIntent {
285 /// The user intends to manage their subscription.
286 ///
287 /// This will open the Stripe billing portal without putting the user in a specific flow.
288 ManageSubscription,
289 /// The user intends to update their payment method.
290 UpdatePaymentMethod,
291 /// The user intends to upgrade to Zed Pro.
292 UpgradeToPro,
293 /// The user intends to cancel their subscription.
294 Cancel,
295 /// The user intends to stop the cancellation of their subscription.
296 StopCancellation,
297}
298
299#[derive(Debug, Deserialize)]
300struct ManageBillingSubscriptionBody {
301 github_user_id: i32,
302 intent: ManageSubscriptionIntent,
303 /// The ID of the subscription to manage.
304 subscription_id: BillingSubscriptionId,
305 redirect_to: Option<String>,
306}
307
308#[derive(Debug, Serialize)]
309struct ManageBillingSubscriptionResponse {
310 billing_portal_session_url: Option<String>,
311}
312
313/// Initiates a Stripe customer portal session for managing a billing subscription.
314async fn manage_billing_subscription(
315 Extension(app): Extension<Arc<AppState>>,
316 extract::Json(body): extract::Json<ManageBillingSubscriptionBody>,
317) -> Result<Json<ManageBillingSubscriptionResponse>> {
318 let user = app
319 .db
320 .get_user_by_github_user_id(body.github_user_id)
321 .await?
322 .context("user not found")?;
323
324 let Some(stripe_client) = app.real_stripe_client.clone() else {
325 log::error!("failed to retrieve Stripe client");
326 Err(Error::http(
327 StatusCode::NOT_IMPLEMENTED,
328 "not supported".into(),
329 ))?
330 };
331
332 let Some(stripe_billing) = app.stripe_billing.clone() else {
333 log::error!("failed to retrieve Stripe billing object");
334 Err(Error::http(
335 StatusCode::NOT_IMPLEMENTED,
336 "not supported".into(),
337 ))?
338 };
339
340 let customer = app
341 .db
342 .get_billing_customer_by_user_id(user.id)
343 .await?
344 .context("billing customer not found")?;
345 let customer_id = CustomerId::from_str(&customer.stripe_customer_id)
346 .context("failed to parse customer ID")?;
347
348 let subscription = app
349 .db
350 .get_billing_subscription_by_id(body.subscription_id)
351 .await?
352 .context("subscription not found")?;
353 let subscription_id = SubscriptionId::from_str(&subscription.stripe_subscription_id)
354 .context("failed to parse subscription ID")?;
355
356 if body.intent == ManageSubscriptionIntent::StopCancellation {
357 let updated_stripe_subscription = Subscription::update(
358 &stripe_client,
359 &subscription_id,
360 stripe::UpdateSubscription {
361 cancel_at_period_end: Some(false),
362 ..Default::default()
363 },
364 )
365 .await?;
366
367 app.db
368 .update_billing_subscription(
369 subscription.id,
370 &UpdateBillingSubscriptionParams {
371 stripe_cancel_at: ActiveValue::set(
372 updated_stripe_subscription
373 .cancel_at
374 .and_then(|cancel_at| DateTime::from_timestamp(cancel_at, 0))
375 .map(|time| time.naive_utc()),
376 ),
377 ..Default::default()
378 },
379 )
380 .await?;
381
382 return Ok(Json(ManageBillingSubscriptionResponse {
383 billing_portal_session_url: None,
384 }));
385 }
386
387 let flow = match body.intent {
388 ManageSubscriptionIntent::ManageSubscription => None,
389 ManageSubscriptionIntent::UpgradeToPro => {
390 let zed_pro_price_id: stripe::PriceId =
391 stripe_billing.zed_pro_price_id().await?.try_into()?;
392 let zed_free_price_id: stripe::PriceId =
393 stripe_billing.zed_free_price_id().await?.try_into()?;
394
395 let stripe_subscription =
396 Subscription::retrieve(&stripe_client, &subscription_id, &[]).await?;
397
398 let is_on_zed_pro_trial = stripe_subscription.status == SubscriptionStatus::Trialing
399 && stripe_subscription.items.data.iter().any(|item| {
400 item.price
401 .as_ref()
402 .map_or(false, |price| price.id == zed_pro_price_id)
403 });
404 if is_on_zed_pro_trial {
405 let payment_methods = PaymentMethod::list(
406 &stripe_client,
407 &stripe::ListPaymentMethods {
408 customer: Some(stripe_subscription.customer.id()),
409 ..Default::default()
410 },
411 )
412 .await?;
413
414 let has_payment_method = !payment_methods.data.is_empty();
415 if !has_payment_method {
416 return Err(Error::http(
417 StatusCode::BAD_REQUEST,
418 "missing payment method".into(),
419 ));
420 }
421
422 // If the user is already on a Zed Pro trial and wants to upgrade to Pro, we just need to end their trial early.
423 Subscription::update(
424 &stripe_client,
425 &stripe_subscription.id,
426 stripe::UpdateSubscription {
427 trial_end: Some(stripe::Scheduled::now()),
428 ..Default::default()
429 },
430 )
431 .await?;
432
433 return Ok(Json(ManageBillingSubscriptionResponse {
434 billing_portal_session_url: None,
435 }));
436 }
437
438 let subscription_item_to_update = stripe_subscription
439 .items
440 .data
441 .iter()
442 .find_map(|item| {
443 let price = item.price.as_ref()?;
444
445 if price.id == zed_free_price_id {
446 Some(item.id.clone())
447 } else {
448 None
449 }
450 })
451 .context("No subscription item to update")?;
452
453 Some(CreateBillingPortalSessionFlowData {
454 type_: CreateBillingPortalSessionFlowDataType::SubscriptionUpdateConfirm,
455 subscription_update_confirm: Some(
456 CreateBillingPortalSessionFlowDataSubscriptionUpdateConfirm {
457 subscription: subscription.stripe_subscription_id,
458 items: vec![
459 CreateBillingPortalSessionFlowDataSubscriptionUpdateConfirmItems {
460 id: subscription_item_to_update.to_string(),
461 price: Some(zed_pro_price_id.to_string()),
462 quantity: Some(1),
463 },
464 ],
465 discounts: None,
466 },
467 ),
468 ..Default::default()
469 })
470 }
471 ManageSubscriptionIntent::UpdatePaymentMethod => Some(CreateBillingPortalSessionFlowData {
472 type_: CreateBillingPortalSessionFlowDataType::PaymentMethodUpdate,
473 after_completion: Some(CreateBillingPortalSessionFlowDataAfterCompletion {
474 type_: stripe::CreateBillingPortalSessionFlowDataAfterCompletionType::Redirect,
475 redirect: Some(CreateBillingPortalSessionFlowDataAfterCompletionRedirect {
476 return_url: format!(
477 "{}{path}",
478 app.config.zed_dot_dev_url(),
479 path = body.redirect_to.unwrap_or_else(|| "/account".to_string())
480 ),
481 }),
482 ..Default::default()
483 }),
484 ..Default::default()
485 }),
486 ManageSubscriptionIntent::Cancel => {
487 if subscription.kind == Some(SubscriptionKind::ZedFree) {
488 return Err(Error::http(
489 StatusCode::BAD_REQUEST,
490 "free subscription cannot be canceled".into(),
491 ));
492 }
493
494 Some(CreateBillingPortalSessionFlowData {
495 type_: CreateBillingPortalSessionFlowDataType::SubscriptionCancel,
496 after_completion: Some(CreateBillingPortalSessionFlowDataAfterCompletion {
497 type_: stripe::CreateBillingPortalSessionFlowDataAfterCompletionType::Redirect,
498 redirect: Some(CreateBillingPortalSessionFlowDataAfterCompletionRedirect {
499 return_url: format!("{}/account", app.config.zed_dot_dev_url()),
500 }),
501 ..Default::default()
502 }),
503 subscription_cancel: Some(
504 stripe::CreateBillingPortalSessionFlowDataSubscriptionCancel {
505 subscription: subscription.stripe_subscription_id,
506 retention: None,
507 },
508 ),
509 ..Default::default()
510 })
511 }
512 ManageSubscriptionIntent::StopCancellation => unreachable!(),
513 };
514
515 let mut params = CreateBillingPortalSession::new(customer_id);
516 params.flow_data = flow;
517 let return_url = format!("{}/account", app.config.zed_dot_dev_url());
518 params.return_url = Some(&return_url);
519
520 let session = BillingPortalSession::create(&stripe_client, params).await?;
521
522 Ok(Json(ManageBillingSubscriptionResponse {
523 billing_portal_session_url: Some(session.url),
524 }))
525}
526
527#[derive(Debug, Deserialize)]
528struct SyncBillingSubscriptionBody {
529 github_user_id: i32,
530}
531
532#[derive(Debug, Serialize)]
533struct SyncBillingSubscriptionResponse {
534 stripe_customer_id: String,
535}
536
537async fn sync_billing_subscription(
538 Extension(app): Extension<Arc<AppState>>,
539 extract::Json(body): extract::Json<SyncBillingSubscriptionBody>,
540) -> Result<Json<SyncBillingSubscriptionResponse>> {
541 let Some(stripe_client) = app.stripe_client.clone() else {
542 log::error!("failed to retrieve Stripe client");
543 Err(Error::http(
544 StatusCode::NOT_IMPLEMENTED,
545 "not supported".into(),
546 ))?
547 };
548
549 let user = app
550 .db
551 .get_user_by_github_user_id(body.github_user_id)
552 .await?
553 .context("user not found")?;
554
555 let billing_customer = app
556 .db
557 .get_billing_customer_by_user_id(user.id)
558 .await?
559 .context("billing customer not found")?;
560 let stripe_customer_id = StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
561
562 let subscriptions = stripe_client
563 .list_subscriptions_for_customer(&stripe_customer_id)
564 .await?;
565
566 for subscription in subscriptions {
567 let subscription_id = subscription.id.clone();
568
569 sync_subscription(&app, &stripe_client, subscription)
570 .await
571 .with_context(|| {
572 format!(
573 "failed to sync subscription {subscription_id} for user {}",
574 user.id,
575 )
576 })?;
577 }
578
579 Ok(Json(SyncBillingSubscriptionResponse {
580 stripe_customer_id: billing_customer.stripe_customer_id.clone(),
581 }))
582}
583
584/// The amount of time we wait in between each poll of Stripe events.
585///
586/// This value should strike a balance between:
587/// 1. Being short enough that we update quickly when something in Stripe changes
588/// 2. Being long enough that we don't eat into our rate limits.
589///
590/// As a point of reference, the Sequin folks say they have this at **500ms**:
591///
592/// > We poll the Stripe /events endpoint every 500ms per account
593/// >
594/// > — https://blog.sequinstream.com/events-not-webhooks/
595const POLL_EVENTS_INTERVAL: Duration = Duration::from_secs(5);
596
597/// The maximum number of events to return per page.
598///
599/// We set this to 100 (the max) so we have to make fewer requests to Stripe.
600///
601/// > Limit can range between 1 and 100, and the default is 10.
602const EVENTS_LIMIT_PER_PAGE: u64 = 100;
603
604/// The number of pages consisting entirely of already-processed events that we
605/// will see before we stop retrieving events.
606///
607/// This is used to prevent over-fetching the Stripe events API for events we've
608/// already seen and processed.
609const NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP: usize = 4;
610
611/// Polls the Stripe events API periodically to reconcile the records in our
612/// database with the data in Stripe.
613pub fn poll_stripe_events_periodically(app: Arc<AppState>, rpc_server: Arc<Server>) {
614 let Some(real_stripe_client) = app.real_stripe_client.clone() else {
615 log::warn!("failed to retrieve Stripe client");
616 return;
617 };
618 let Some(stripe_client) = app.stripe_client.clone() else {
619 log::warn!("failed to retrieve Stripe client");
620 return;
621 };
622
623 let executor = app.executor.clone();
624 executor.spawn_detached({
625 let executor = executor.clone();
626 async move {
627 loop {
628 poll_stripe_events(&app, &rpc_server, &stripe_client, &real_stripe_client)
629 .await
630 .log_err();
631
632 executor.sleep(POLL_EVENTS_INTERVAL).await;
633 }
634 }
635 });
636}
637
638async fn poll_stripe_events(
639 app: &Arc<AppState>,
640 rpc_server: &Arc<Server>,
641 stripe_client: &Arc<dyn StripeClient>,
642 real_stripe_client: &stripe::Client,
643) -> anyhow::Result<()> {
644 fn event_type_to_string(event_type: EventType) -> String {
645 // Calling `to_string` on `stripe::EventType` members gives us a quoted string,
646 // so we need to unquote it.
647 event_type.to_string().trim_matches('"').to_string()
648 }
649
650 let event_types = [
651 EventType::CustomerCreated,
652 EventType::CustomerUpdated,
653 EventType::CustomerSubscriptionCreated,
654 EventType::CustomerSubscriptionUpdated,
655 EventType::CustomerSubscriptionPaused,
656 EventType::CustomerSubscriptionResumed,
657 EventType::CustomerSubscriptionDeleted,
658 ]
659 .into_iter()
660 .map(event_type_to_string)
661 .collect::<Vec<_>>();
662
663 let mut pages_of_already_processed_events = 0;
664 let mut unprocessed_events = Vec::new();
665
666 log::info!(
667 "Stripe events: starting retrieval for {}",
668 event_types.join(", ")
669 );
670 let mut params = ListEvents::new();
671 params.types = Some(event_types.clone());
672 params.limit = Some(EVENTS_LIMIT_PER_PAGE);
673
674 let mut event_pages = stripe::Event::list(&real_stripe_client, ¶ms)
675 .await?
676 .paginate(params);
677
678 loop {
679 let processed_event_ids = {
680 let event_ids = event_pages
681 .page
682 .data
683 .iter()
684 .map(|event| event.id.as_str())
685 .collect::<Vec<_>>();
686 app.db
687 .get_processed_stripe_events_by_event_ids(&event_ids)
688 .await?
689 .into_iter()
690 .map(|event| event.stripe_event_id)
691 .collect::<Vec<_>>()
692 };
693
694 let mut processed_events_in_page = 0;
695 let events_in_page = event_pages.page.data.len();
696 for event in &event_pages.page.data {
697 if processed_event_ids.contains(&event.id.to_string()) {
698 processed_events_in_page += 1;
699 log::debug!("Stripe events: already processed '{}', skipping", event.id);
700 } else {
701 unprocessed_events.push(event.clone());
702 }
703 }
704
705 if processed_events_in_page == events_in_page {
706 pages_of_already_processed_events += 1;
707 }
708
709 if event_pages.page.has_more {
710 if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP
711 {
712 log::info!(
713 "Stripe events: stopping, saw {pages_of_already_processed_events} pages of already-processed events"
714 );
715 break;
716 } else {
717 log::info!("Stripe events: retrieving next page");
718 event_pages = event_pages.next(&real_stripe_client).await?;
719 }
720 } else {
721 break;
722 }
723 }
724
725 log::info!("Stripe events: unprocessed {}", unprocessed_events.len());
726
727 // Sort all of the unprocessed events in ascending order, so we can handle them in the order they occurred.
728 unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id)));
729
730 for event in unprocessed_events {
731 let event_id = event.id.clone();
732 let processed_event_params = CreateProcessedStripeEventParams {
733 stripe_event_id: event.id.to_string(),
734 stripe_event_type: event_type_to_string(event.type_),
735 stripe_event_created_timestamp: event.created,
736 };
737
738 // If the event has happened too far in the past, we don't want to
739 // process it and risk overwriting other more-recent updates.
740 //
741 // 1 day was chosen arbitrarily. This could be made longer or shorter.
742 let one_day = Duration::from_secs(24 * 60 * 60);
743 let a_day_ago = Utc::now() - one_day;
744 if a_day_ago.timestamp() > event.created {
745 log::info!(
746 "Stripe events: event '{}' is more than {one_day:?} old, marking as processed",
747 event_id
748 );
749 app.db
750 .create_processed_stripe_event(&processed_event_params)
751 .await?;
752
753 continue;
754 }
755
756 let process_result = match event.type_ {
757 EventType::CustomerCreated | EventType::CustomerUpdated => {
758 handle_customer_event(app, real_stripe_client, event).await
759 }
760 EventType::CustomerSubscriptionCreated
761 | EventType::CustomerSubscriptionUpdated
762 | EventType::CustomerSubscriptionPaused
763 | EventType::CustomerSubscriptionResumed
764 | EventType::CustomerSubscriptionDeleted => {
765 handle_customer_subscription_event(app, rpc_server, stripe_client, event).await
766 }
767 _ => Ok(()),
768 };
769
770 if let Some(()) = process_result
771 .with_context(|| format!("failed to process event {event_id} successfully"))
772 .log_err()
773 {
774 app.db
775 .create_processed_stripe_event(&processed_event_params)
776 .await?;
777 }
778 }
779
780 Ok(())
781}
782
783async fn handle_customer_event(
784 app: &Arc<AppState>,
785 _stripe_client: &stripe::Client,
786 event: stripe::Event,
787) -> anyhow::Result<()> {
788 let EventObject::Customer(customer) = event.data.object else {
789 bail!("unexpected event payload for {}", event.id);
790 };
791
792 log::info!("handling Stripe {} event: {}", event.type_, event.id);
793
794 let Some(email) = customer.email else {
795 log::info!("Stripe customer has no email: skipping");
796 return Ok(());
797 };
798
799 let Some(user) = app.db.get_user_by_email(&email).await? else {
800 log::info!("no user found for email: skipping");
801 return Ok(());
802 };
803
804 if let Some(existing_customer) = app
805 .db
806 .get_billing_customer_by_stripe_customer_id(&customer.id)
807 .await?
808 {
809 app.db
810 .update_billing_customer(
811 existing_customer.id,
812 &UpdateBillingCustomerParams {
813 // For now we just leave the information as-is, as it is not
814 // likely to change.
815 ..Default::default()
816 },
817 )
818 .await?;
819 } else {
820 app.db
821 .create_billing_customer(&CreateBillingCustomerParams {
822 user_id: user.id,
823 stripe_customer_id: customer.id.to_string(),
824 })
825 .await?;
826 }
827
828 Ok(())
829}
830
831async fn sync_subscription(
832 app: &Arc<AppState>,
833 stripe_client: &Arc<dyn StripeClient>,
834 subscription: StripeSubscription,
835) -> anyhow::Result<billing_customer::Model> {
836 let subscription_kind = if let Some(stripe_billing) = &app.stripe_billing {
837 stripe_billing
838 .determine_subscription_kind(&subscription)
839 .await
840 } else {
841 None
842 };
843
844 let billing_customer =
845 find_or_create_billing_customer(app, stripe_client.as_ref(), &subscription.customer)
846 .await?
847 .context("billing customer not found")?;
848
849 if let Some(SubscriptionKind::ZedProTrial) = subscription_kind {
850 if subscription.status == SubscriptionStatus::Trialing {
851 let current_period_start =
852 DateTime::from_timestamp(subscription.current_period_start, 0)
853 .context("No trial subscription period start")?;
854
855 app.db
856 .update_billing_customer(
857 billing_customer.id,
858 &UpdateBillingCustomerParams {
859 trial_started_at: ActiveValue::set(Some(current_period_start.naive_utc())),
860 ..Default::default()
861 },
862 )
863 .await?;
864 }
865 }
866
867 let was_canceled_due_to_payment_failure = subscription.status == SubscriptionStatus::Canceled
868 && subscription
869 .cancellation_details
870 .as_ref()
871 .and_then(|details| details.reason)
872 .map_or(false, |reason| {
873 reason == StripeCancellationDetailsReason::PaymentFailed
874 });
875
876 if was_canceled_due_to_payment_failure {
877 app.db
878 .update_billing_customer(
879 billing_customer.id,
880 &UpdateBillingCustomerParams {
881 has_overdue_invoices: ActiveValue::set(true),
882 ..Default::default()
883 },
884 )
885 .await?;
886 }
887
888 if let Some(existing_subscription) = app
889 .db
890 .get_billing_subscription_by_stripe_subscription_id(subscription.id.0.as_ref())
891 .await?
892 {
893 app.db
894 .update_billing_subscription(
895 existing_subscription.id,
896 &UpdateBillingSubscriptionParams {
897 billing_customer_id: ActiveValue::set(billing_customer.id),
898 kind: ActiveValue::set(subscription_kind),
899 stripe_subscription_id: ActiveValue::set(subscription.id.to_string()),
900 stripe_subscription_status: ActiveValue::set(subscription.status.into()),
901 stripe_cancel_at: ActiveValue::set(
902 subscription
903 .cancel_at
904 .and_then(|cancel_at| DateTime::from_timestamp(cancel_at, 0))
905 .map(|time| time.naive_utc()),
906 ),
907 stripe_cancellation_reason: ActiveValue::set(
908 subscription
909 .cancellation_details
910 .and_then(|details| details.reason)
911 .map(|reason| reason.into()),
912 ),
913 stripe_current_period_start: ActiveValue::set(Some(
914 subscription.current_period_start,
915 )),
916 stripe_current_period_end: ActiveValue::set(Some(
917 subscription.current_period_end,
918 )),
919 },
920 )
921 .await?;
922 } else {
923 if let Some(existing_subscription) = app
924 .db
925 .get_active_billing_subscription(billing_customer.user_id)
926 .await?
927 {
928 if existing_subscription.kind == Some(SubscriptionKind::ZedFree)
929 && subscription_kind == Some(SubscriptionKind::ZedProTrial)
930 {
931 let stripe_subscription_id = StripeSubscriptionId(
932 existing_subscription.stripe_subscription_id.clone().into(),
933 );
934
935 stripe_client
936 .cancel_subscription(&stripe_subscription_id)
937 .await?;
938 } else {
939 // If the user already has an active billing subscription, ignore the
940 // event and return an `Ok` to signal that it was processed
941 // successfully.
942 //
943 // There is the possibility that this could cause us to not create a
944 // subscription in the following scenario:
945 //
946 // 1. User has an active subscription A
947 // 2. User cancels subscription A
948 // 3. User creates a new subscription B
949 // 4. We process the new subscription B before the cancellation of subscription A
950 // 5. User ends up with no subscriptions
951 //
952 // In theory this situation shouldn't arise as we try to process the events in the order they occur.
953
954 log::info!(
955 "user {user_id} already has an active subscription, skipping creation of subscription {subscription_id}",
956 user_id = billing_customer.user_id,
957 subscription_id = subscription.id
958 );
959 return Ok(billing_customer);
960 }
961 }
962
963 app.db
964 .create_billing_subscription(&CreateBillingSubscriptionParams {
965 billing_customer_id: billing_customer.id,
966 kind: subscription_kind,
967 stripe_subscription_id: subscription.id.to_string(),
968 stripe_subscription_status: subscription.status.into(),
969 stripe_cancellation_reason: subscription
970 .cancellation_details
971 .and_then(|details| details.reason)
972 .map(|reason| reason.into()),
973 stripe_current_period_start: Some(subscription.current_period_start),
974 stripe_current_period_end: Some(subscription.current_period_end),
975 })
976 .await?;
977 }
978
979 if let Some(stripe_billing) = app.stripe_billing.as_ref() {
980 if subscription.status == SubscriptionStatus::Canceled
981 || subscription.status == SubscriptionStatus::Paused
982 {
983 let already_has_active_billing_subscription = app
984 .db
985 .has_active_billing_subscription(billing_customer.user_id)
986 .await?;
987 if !already_has_active_billing_subscription {
988 let stripe_customer_id =
989 StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
990
991 stripe_billing
992 .subscribe_to_zed_free(stripe_customer_id)
993 .await?;
994 }
995 }
996 }
997
998 Ok(billing_customer)
999}
1000
1001async fn handle_customer_subscription_event(
1002 app: &Arc<AppState>,
1003 rpc_server: &Arc<Server>,
1004 stripe_client: &Arc<dyn StripeClient>,
1005 event: stripe::Event,
1006) -> anyhow::Result<()> {
1007 let EventObject::Subscription(subscription) = event.data.object else {
1008 bail!("unexpected event payload for {}", event.id);
1009 };
1010
1011 log::info!("handling Stripe {} event: {}", event.type_, event.id);
1012
1013 let billing_customer = sync_subscription(app, stripe_client, subscription.into()).await?;
1014
1015 // When the user's subscription changes, push down any changes to their plan.
1016 rpc_server
1017 .update_plan_for_user(billing_customer.user_id)
1018 .await
1019 .trace_err();
1020
1021 // When the user's subscription changes, we want to refresh their LLM tokens
1022 // to either grant/revoke access.
1023 rpc_server
1024 .refresh_llm_tokens_for_user(billing_customer.user_id)
1025 .await;
1026
1027 Ok(())
1028}
1029
1030#[derive(Debug, Deserialize)]
1031struct GetCurrentUsageParams {
1032 github_user_id: i32,
1033}
1034
1035#[derive(Debug, Serialize)]
1036struct UsageCounts {
1037 pub used: i32,
1038 pub limit: Option<i32>,
1039 pub remaining: Option<i32>,
1040}
1041
1042#[derive(Debug, Serialize)]
1043struct ModelRequestUsage {
1044 pub model: String,
1045 pub mode: CompletionMode,
1046 pub requests: i32,
1047}
1048
1049#[derive(Debug, Serialize)]
1050struct CurrentUsage {
1051 pub model_requests: UsageCounts,
1052 pub model_request_usage: Vec<ModelRequestUsage>,
1053 pub edit_predictions: UsageCounts,
1054}
1055
1056#[derive(Debug, Default, Serialize)]
1057struct GetCurrentUsageResponse {
1058 pub plan: String,
1059 pub current_usage: Option<CurrentUsage>,
1060}
1061
1062async fn get_current_usage(
1063 Extension(app): Extension<Arc<AppState>>,
1064 Query(params): Query<GetCurrentUsageParams>,
1065) -> Result<Json<GetCurrentUsageResponse>> {
1066 let user = app
1067 .db
1068 .get_user_by_github_user_id(params.github_user_id)
1069 .await?
1070 .context("user not found")?;
1071
1072 let feature_flags = app.db.get_user_flags(user.id).await?;
1073 let has_extended_trial = feature_flags
1074 .iter()
1075 .any(|flag| flag == AGENT_EXTENDED_TRIAL_FEATURE_FLAG);
1076
1077 let Some(llm_db) = app.llm_db.clone() else {
1078 return Err(Error::http(
1079 StatusCode::NOT_IMPLEMENTED,
1080 "LLM database not available".into(),
1081 ));
1082 };
1083
1084 let Some(subscription) = app.db.get_active_billing_subscription(user.id).await? else {
1085 return Ok(Json(GetCurrentUsageResponse::default()));
1086 };
1087
1088 let subscription_period = maybe!({
1089 let period_start_at = subscription.current_period_start_at()?;
1090 let period_end_at = subscription.current_period_end_at()?;
1091
1092 Some((period_start_at, period_end_at))
1093 });
1094
1095 let Some((period_start_at, period_end_at)) = subscription_period else {
1096 return Ok(Json(GetCurrentUsageResponse::default()));
1097 };
1098
1099 let usage = llm_db
1100 .get_subscription_usage_for_period(user.id, period_start_at, period_end_at)
1101 .await?;
1102
1103 let plan = subscription
1104 .kind
1105 .map(Into::into)
1106 .unwrap_or(zed_llm_client::Plan::ZedFree);
1107
1108 let model_requests_limit = match plan.model_requests_limit() {
1109 zed_llm_client::UsageLimit::Limited(limit) => {
1110 let limit = if plan == zed_llm_client::Plan::ZedProTrial && has_extended_trial {
1111 1_000
1112 } else {
1113 limit
1114 };
1115
1116 Some(limit)
1117 }
1118 zed_llm_client::UsageLimit::Unlimited => None,
1119 };
1120
1121 let edit_predictions_limit = match plan.edit_predictions_limit() {
1122 zed_llm_client::UsageLimit::Limited(limit) => Some(limit),
1123 zed_llm_client::UsageLimit::Unlimited => None,
1124 };
1125
1126 let Some(usage) = usage else {
1127 return Ok(Json(GetCurrentUsageResponse {
1128 plan: plan.as_str().to_string(),
1129 current_usage: Some(CurrentUsage {
1130 model_requests: UsageCounts {
1131 used: 0,
1132 limit: model_requests_limit,
1133 remaining: model_requests_limit,
1134 },
1135 model_request_usage: Vec::new(),
1136 edit_predictions: UsageCounts {
1137 used: 0,
1138 limit: edit_predictions_limit,
1139 remaining: edit_predictions_limit,
1140 },
1141 }),
1142 }));
1143 };
1144
1145 let subscription_usage_meters = llm_db
1146 .get_current_subscription_usage_meters_for_user(user.id, Utc::now())
1147 .await?;
1148
1149 let model_request_usage = subscription_usage_meters
1150 .into_iter()
1151 .filter_map(|(usage_meter, _usage)| {
1152 let model = llm_db.model_by_id(usage_meter.model_id).ok()?;
1153
1154 Some(ModelRequestUsage {
1155 model: model.name.clone(),
1156 mode: usage_meter.mode,
1157 requests: usage_meter.requests,
1158 })
1159 })
1160 .collect::<Vec<_>>();
1161
1162 Ok(Json(GetCurrentUsageResponse {
1163 plan: plan.as_str().to_string(),
1164 current_usage: Some(CurrentUsage {
1165 model_requests: UsageCounts {
1166 used: usage.model_requests,
1167 limit: model_requests_limit,
1168 remaining: model_requests_limit.map(|limit| (limit - usage.model_requests).max(0)),
1169 },
1170 model_request_usage,
1171 edit_predictions: UsageCounts {
1172 used: usage.edit_predictions,
1173 limit: edit_predictions_limit,
1174 remaining: edit_predictions_limit
1175 .map(|limit| (limit - usage.edit_predictions).max(0)),
1176 },
1177 }),
1178 }))
1179}
1180
1181impl From<SubscriptionStatus> for StripeSubscriptionStatus {
1182 fn from(value: SubscriptionStatus) -> Self {
1183 match value {
1184 SubscriptionStatus::Incomplete => Self::Incomplete,
1185 SubscriptionStatus::IncompleteExpired => Self::IncompleteExpired,
1186 SubscriptionStatus::Trialing => Self::Trialing,
1187 SubscriptionStatus::Active => Self::Active,
1188 SubscriptionStatus::PastDue => Self::PastDue,
1189 SubscriptionStatus::Canceled => Self::Canceled,
1190 SubscriptionStatus::Unpaid => Self::Unpaid,
1191 SubscriptionStatus::Paused => Self::Paused,
1192 }
1193 }
1194}
1195
1196impl From<CancellationDetailsReason> for StripeCancellationReason {
1197 fn from(value: CancellationDetailsReason) -> Self {
1198 match value {
1199 CancellationDetailsReason::CancellationRequested => Self::CancellationRequested,
1200 CancellationDetailsReason::PaymentDisputed => Self::PaymentDisputed,
1201 CancellationDetailsReason::PaymentFailed => Self::PaymentFailed,
1202 }
1203 }
1204}
1205
1206/// Finds or creates a billing customer using the provided customer.
1207pub async fn find_or_create_billing_customer(
1208 app: &Arc<AppState>,
1209 stripe_client: &dyn StripeClient,
1210 customer_id: &StripeCustomerId,
1211) -> anyhow::Result<Option<billing_customer::Model>> {
1212 // If we already have a billing customer record associated with the Stripe customer,
1213 // there's nothing more we need to do.
1214 if let Some(billing_customer) = app
1215 .db
1216 .get_billing_customer_by_stripe_customer_id(customer_id.0.as_ref())
1217 .await?
1218 {
1219 return Ok(Some(billing_customer));
1220 }
1221
1222 let customer = stripe_client.get_customer(customer_id).await?;
1223
1224 let Some(email) = customer.email else {
1225 return Ok(None);
1226 };
1227
1228 let Some(user) = app.db.get_user_by_email(&email).await? else {
1229 return Ok(None);
1230 };
1231
1232 let billing_customer = app
1233 .db
1234 .create_billing_customer(&CreateBillingCustomerParams {
1235 user_id: user.id,
1236 stripe_customer_id: customer.id.to_string(),
1237 })
1238 .await?;
1239
1240 Ok(Some(billing_customer))
1241}
1242
1243const SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL: Duration = Duration::from_secs(60);
1244
1245pub fn sync_llm_request_usage_with_stripe_periodically(app: Arc<AppState>) {
1246 let Some(stripe_billing) = app.stripe_billing.clone() else {
1247 log::warn!("failed to retrieve Stripe billing object");
1248 return;
1249 };
1250 let Some(llm_db) = app.llm_db.clone() else {
1251 log::warn!("failed to retrieve LLM database");
1252 return;
1253 };
1254
1255 let executor = app.executor.clone();
1256 executor.spawn_detached({
1257 let executor = executor.clone();
1258 async move {
1259 loop {
1260 sync_model_request_usage_with_stripe(&app, &llm_db, &stripe_billing)
1261 .await
1262 .context("failed to sync LLM request usage to Stripe")
1263 .trace_err();
1264 executor
1265 .sleep(SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL)
1266 .await;
1267 }
1268 }
1269 });
1270}
1271
1272async fn sync_model_request_usage_with_stripe(
1273 app: &Arc<AppState>,
1274 llm_db: &Arc<LlmDatabase>,
1275 stripe_billing: &Arc<StripeBilling>,
1276) -> anyhow::Result<()> {
1277 log::info!("Stripe usage sync: Starting");
1278 let started_at = Utc::now();
1279
1280 let staff_users = app.db.get_staff_users().await?;
1281 let staff_user_ids = staff_users
1282 .iter()
1283 .map(|user| user.id)
1284 .collect::<HashSet<UserId>>();
1285
1286 let usage_meters = llm_db
1287 .get_current_subscription_usage_meters(Utc::now())
1288 .await?;
1289 let mut usage_meters_by_user_id =
1290 HashMap::<UserId, Vec<subscription_usage_meter::Model>>::default();
1291 for (usage_meter, usage) in usage_meters {
1292 let meters = usage_meters_by_user_id.entry(usage.user_id).or_default();
1293 meters.push(usage_meter);
1294 }
1295
1296 log::info!("Stripe usage sync: Retrieving Zed Pro subscriptions");
1297 let get_zed_pro_subscriptions_started_at = Utc::now();
1298 let billing_subscriptions = app.db.get_active_zed_pro_billing_subscriptions().await?;
1299 log::info!(
1300 "Stripe usage sync: Retrieved {} Zed Pro subscriptions in {}",
1301 billing_subscriptions.len(),
1302 Utc::now() - get_zed_pro_subscriptions_started_at
1303 );
1304
1305 let claude_sonnet_4 = stripe_billing
1306 .find_price_by_lookup_key("claude-sonnet-4-requests")
1307 .await?;
1308 let claude_sonnet_4_max = stripe_billing
1309 .find_price_by_lookup_key("claude-sonnet-4-requests-max")
1310 .await?;
1311 let claude_opus_4 = stripe_billing
1312 .find_price_by_lookup_key("claude-opus-4-requests")
1313 .await?;
1314 let claude_opus_4_max = stripe_billing
1315 .find_price_by_lookup_key("claude-opus-4-requests-max")
1316 .await?;
1317 let claude_3_5_sonnet = stripe_billing
1318 .find_price_by_lookup_key("claude-3-5-sonnet-requests")
1319 .await?;
1320 let claude_3_7_sonnet = stripe_billing
1321 .find_price_by_lookup_key("claude-3-7-sonnet-requests")
1322 .await?;
1323 let claude_3_7_sonnet_max = stripe_billing
1324 .find_price_by_lookup_key("claude-3-7-sonnet-requests-max")
1325 .await?;
1326
1327 let model_mode_combinations = [
1328 ("claude-opus-4", CompletionMode::Max),
1329 ("claude-opus-4", CompletionMode::Normal),
1330 ("claude-sonnet-4", CompletionMode::Max),
1331 ("claude-sonnet-4", CompletionMode::Normal),
1332 ("claude-3-7-sonnet", CompletionMode::Max),
1333 ("claude-3-7-sonnet", CompletionMode::Normal),
1334 ("claude-3-5-sonnet", CompletionMode::Normal),
1335 ];
1336
1337 let billing_subscription_count = billing_subscriptions.len();
1338
1339 log::info!("Stripe usage sync: Syncing {billing_subscription_count} Zed Pro subscriptions");
1340
1341 for (user_id, (billing_customer, billing_subscription)) in billing_subscriptions {
1342 maybe!(async {
1343 if staff_user_ids.contains(&user_id) {
1344 return anyhow::Ok(());
1345 }
1346
1347 let stripe_customer_id =
1348 StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
1349 let stripe_subscription_id =
1350 StripeSubscriptionId(billing_subscription.stripe_subscription_id.clone().into());
1351
1352 let usage_meters = usage_meters_by_user_id.get(&user_id);
1353
1354 for (model, mode) in &model_mode_combinations {
1355 let Ok(model) =
1356 llm_db.model(LanguageModelProvider::Anthropic, model)
1357 else {
1358 log::warn!("Failed to load model for user {user_id}: {model}");
1359 continue;
1360 };
1361
1362 let (price, meter_event_name) = match model.name.as_str() {
1363 "claude-opus-4" => match mode {
1364 CompletionMode::Normal => (&claude_opus_4, "claude_opus_4/requests"),
1365 CompletionMode::Max => (&claude_opus_4_max, "claude_opus_4/requests/max"),
1366 },
1367 "claude-sonnet-4" => match mode {
1368 CompletionMode::Normal => (&claude_sonnet_4, "claude_sonnet_4/requests"),
1369 CompletionMode::Max => {
1370 (&claude_sonnet_4_max, "claude_sonnet_4/requests/max")
1371 }
1372 },
1373 "claude-3-5-sonnet" => (&claude_3_5_sonnet, "claude_3_5_sonnet/requests"),
1374 "claude-3-7-sonnet" => match mode {
1375 CompletionMode::Normal => {
1376 (&claude_3_7_sonnet, "claude_3_7_sonnet/requests")
1377 }
1378 CompletionMode::Max => {
1379 (&claude_3_7_sonnet_max, "claude_3_7_sonnet/requests/max")
1380 }
1381 },
1382 model_name => {
1383 bail!("Attempted to sync usage meter for unsupported model: {model_name:?}")
1384 }
1385 };
1386
1387 let model_requests = usage_meters
1388 .and_then(|usage_meters| {
1389 usage_meters
1390 .iter()
1391 .find(|meter| meter.model_id == model.id && meter.mode == *mode)
1392 })
1393 .map(|usage_meter| usage_meter.requests)
1394 .unwrap_or(0);
1395
1396 if model_requests > 0 {
1397 stripe_billing
1398 .subscribe_to_price(&stripe_subscription_id, price)
1399 .await?;
1400 }
1401
1402 stripe_billing
1403 .bill_model_request_usage(&stripe_customer_id, meter_event_name, model_requests)
1404 .await
1405 .with_context(|| {
1406 format!(
1407 "Failed to bill model request usage of {model_requests} for {stripe_customer_id}: {meter_event_name}",
1408 )
1409 })?;
1410 }
1411
1412 Ok(())
1413 })
1414 .await
1415 .log_err();
1416 }
1417
1418 log::info!(
1419 "Stripe usage sync: Synced {billing_subscription_count} Zed Pro subscriptions in {}",
1420 Utc::now() - started_at
1421 );
1422
1423 Ok(())
1424}