1use anyhow::{Context, anyhow, bail};
2use axum::{
3 Extension, Json, Router,
4 extract::{self, Query},
5 routing::{get, post},
6};
7use chrono::{DateTime, SecondsFormat, Utc};
8use collections::HashSet;
9use reqwest::StatusCode;
10use sea_orm::ActiveValue;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use std::{str::FromStr, sync::Arc, time::Duration};
14use stripe::{
15 BillingPortalSession, CancellationDetailsReason, CreateBillingPortalSession,
16 CreateBillingPortalSessionFlowData, CreateBillingPortalSessionFlowDataAfterCompletion,
17 CreateBillingPortalSessionFlowDataAfterCompletionRedirect,
18 CreateBillingPortalSessionFlowDataSubscriptionUpdateConfirm,
19 CreateBillingPortalSessionFlowDataSubscriptionUpdateConfirmItems,
20 CreateBillingPortalSessionFlowDataType, CreateCustomer, Customer, CustomerId, EventObject,
21 EventType, Expandable, ListEvents, Subscription, SubscriptionId, SubscriptionStatus,
22};
23use util::{ResultExt, maybe};
24
25use crate::api::events::SnowflakeRow;
26use crate::db::billing_subscription::{
27 StripeCancellationReason, StripeSubscriptionStatus, SubscriptionKind,
28};
29use crate::llm::{DEFAULT_MAX_MONTHLY_SPEND, FREE_TIER_MONTHLY_SPENDING_LIMIT};
30use crate::rpc::{ResultExt as _, Server};
31use crate::{AppState, Cents, Error, Result};
32use crate::{db::UserId, llm::db::LlmDatabase};
33use crate::{
34 db::{
35 BillingSubscriptionId, CreateBillingCustomerParams, CreateBillingSubscriptionParams,
36 CreateProcessedStripeEventParams, UpdateBillingCustomerParams,
37 UpdateBillingPreferencesParams, UpdateBillingSubscriptionParams, billing_customer,
38 },
39 stripe_billing::StripeBilling,
40};
41
42pub fn router() -> Router {
43 Router::new()
44 .route(
45 "/billing/preferences",
46 get(get_billing_preferences).put(update_billing_preferences),
47 )
48 .route(
49 "/billing/subscriptions",
50 get(list_billing_subscriptions).post(create_billing_subscription),
51 )
52 .route(
53 "/billing/subscriptions/manage",
54 post(manage_billing_subscription),
55 )
56 .route("/billing/monthly_spend", get(get_monthly_spend))
57 .route("/billing/usage", get(get_current_usage))
58}
59
60#[derive(Debug, Deserialize)]
61struct GetBillingPreferencesParams {
62 github_user_id: i32,
63}
64
65#[derive(Debug, Serialize)]
66struct BillingPreferencesResponse {
67 max_monthly_llm_usage_spending_in_cents: i32,
68 model_request_overages_enabled: bool,
69 model_request_overages_spend_limit_in_cents: i32,
70}
71
72async fn get_billing_preferences(
73 Extension(app): Extension<Arc<AppState>>,
74 Query(params): Query<GetBillingPreferencesParams>,
75) -> Result<Json<BillingPreferencesResponse>> {
76 let user = app
77 .db
78 .get_user_by_github_user_id(params.github_user_id)
79 .await?
80 .ok_or_else(|| anyhow!("user not found"))?;
81
82 let preferences = app.db.get_billing_preferences(user.id).await?;
83
84 Ok(Json(BillingPreferencesResponse {
85 max_monthly_llm_usage_spending_in_cents: preferences
86 .as_ref()
87 .map_or(DEFAULT_MAX_MONTHLY_SPEND.0 as i32, |preferences| {
88 preferences.max_monthly_llm_usage_spending_in_cents
89 }),
90 model_request_overages_enabled: preferences.as_ref().map_or(false, |preferences| {
91 preferences.model_request_overages_enabled
92 }),
93 model_request_overages_spend_limit_in_cents: preferences
94 .as_ref()
95 .map_or(0, |preferences| {
96 preferences.model_request_overages_spend_limit_in_cents
97 }),
98 }))
99}
100
101#[derive(Debug, Deserialize)]
102struct UpdateBillingPreferencesBody {
103 github_user_id: i32,
104 #[serde(default)]
105 max_monthly_llm_usage_spending_in_cents: i32,
106 #[serde(default)]
107 model_request_overages_enabled: bool,
108 #[serde(default)]
109 model_request_overages_spend_limit_in_cents: i32,
110}
111
112async fn update_billing_preferences(
113 Extension(app): Extension<Arc<AppState>>,
114 Extension(rpc_server): Extension<Arc<crate::rpc::Server>>,
115 extract::Json(body): extract::Json<UpdateBillingPreferencesBody>,
116) -> Result<Json<BillingPreferencesResponse>> {
117 let user = app
118 .db
119 .get_user_by_github_user_id(body.github_user_id)
120 .await?
121 .ok_or_else(|| anyhow!("user not found"))?;
122
123 let max_monthly_llm_usage_spending_in_cents =
124 body.max_monthly_llm_usage_spending_in_cents.max(0);
125 let model_request_overages_spend_limit_in_cents =
126 body.model_request_overages_spend_limit_in_cents.max(0);
127
128 let billing_preferences =
129 if let Some(_billing_preferences) = app.db.get_billing_preferences(user.id).await? {
130 app.db
131 .update_billing_preferences(
132 user.id,
133 &UpdateBillingPreferencesParams {
134 max_monthly_llm_usage_spending_in_cents: ActiveValue::set(
135 max_monthly_llm_usage_spending_in_cents,
136 ),
137 model_request_overages_enabled: ActiveValue::set(
138 body.model_request_overages_enabled,
139 ),
140 model_request_overages_spend_limit_in_cents: ActiveValue::set(
141 model_request_overages_spend_limit_in_cents,
142 ),
143 },
144 )
145 .await?
146 } else {
147 app.db
148 .create_billing_preferences(
149 user.id,
150 &crate::db::CreateBillingPreferencesParams {
151 max_monthly_llm_usage_spending_in_cents,
152 model_request_overages_enabled: body.model_request_overages_enabled,
153 model_request_overages_spend_limit_in_cents,
154 },
155 )
156 .await?
157 };
158
159 SnowflakeRow::new(
160 "Billing Preferences Updated",
161 Some(user.metrics_id),
162 user.admin,
163 None,
164 json!({
165 "user_id": user.id,
166 "model_request_overages_enabled": billing_preferences.model_request_overages_enabled,
167 "model_request_overages_spend_limit_in_cents": billing_preferences.model_request_overages_spend_limit_in_cents,
168 "max_monthly_llm_usage_spending_in_cents": billing_preferences.max_monthly_llm_usage_spending_in_cents,
169 }),
170 )
171 .write(&app.kinesis_client, &app.config.kinesis_stream)
172 .await
173 .log_err();
174
175 rpc_server.refresh_llm_tokens_for_user(user.id).await;
176
177 Ok(Json(BillingPreferencesResponse {
178 max_monthly_llm_usage_spending_in_cents: billing_preferences
179 .max_monthly_llm_usage_spending_in_cents,
180 model_request_overages_enabled: billing_preferences.model_request_overages_enabled,
181 model_request_overages_spend_limit_in_cents: billing_preferences
182 .model_request_overages_spend_limit_in_cents,
183 }))
184}
185
186#[derive(Debug, Deserialize)]
187struct ListBillingSubscriptionsParams {
188 github_user_id: i32,
189}
190
191#[derive(Debug, Serialize)]
192struct BillingSubscriptionJson {
193 id: BillingSubscriptionId,
194 name: String,
195 status: StripeSubscriptionStatus,
196 trial_end_at: Option<String>,
197 cancel_at: Option<String>,
198 /// Whether this subscription can be canceled.
199 is_cancelable: bool,
200}
201
202#[derive(Debug, Serialize)]
203struct ListBillingSubscriptionsResponse {
204 subscriptions: Vec<BillingSubscriptionJson>,
205}
206
207async fn list_billing_subscriptions(
208 Extension(app): Extension<Arc<AppState>>,
209 Query(params): Query<ListBillingSubscriptionsParams>,
210) -> Result<Json<ListBillingSubscriptionsResponse>> {
211 let user = app
212 .db
213 .get_user_by_github_user_id(params.github_user_id)
214 .await?
215 .ok_or_else(|| anyhow!("user not found"))?;
216
217 let subscriptions = app.db.get_billing_subscriptions(user.id).await?;
218
219 Ok(Json(ListBillingSubscriptionsResponse {
220 subscriptions: subscriptions
221 .into_iter()
222 .map(|subscription| BillingSubscriptionJson {
223 id: subscription.id,
224 name: match subscription.kind {
225 Some(SubscriptionKind::ZedPro) => "Zed Pro".to_string(),
226 Some(SubscriptionKind::ZedProTrial) => "Zed Pro (Trial)".to_string(),
227 Some(SubscriptionKind::ZedFree) => "Zed Free".to_string(),
228 None => "Zed LLM Usage".to_string(),
229 },
230 status: subscription.stripe_subscription_status,
231 trial_end_at: if subscription.kind == Some(SubscriptionKind::ZedProTrial) {
232 maybe!({
233 let end_at = subscription.stripe_current_period_end?;
234 let end_at = DateTime::from_timestamp(end_at, 0)?;
235
236 Some(end_at.to_rfc3339_opts(SecondsFormat::Millis, true))
237 })
238 } else {
239 None
240 },
241 cancel_at: subscription.stripe_cancel_at.map(|cancel_at| {
242 cancel_at
243 .and_utc()
244 .to_rfc3339_opts(SecondsFormat::Millis, true)
245 }),
246 is_cancelable: subscription.stripe_subscription_status.is_cancelable()
247 && subscription.stripe_cancel_at.is_none(),
248 })
249 .collect(),
250 }))
251}
252
253#[derive(Debug, Clone, Copy, Deserialize)]
254#[serde(rename_all = "snake_case")]
255enum ProductCode {
256 ZedPro,
257 ZedProTrial,
258}
259
260#[derive(Debug, Deserialize)]
261struct CreateBillingSubscriptionBody {
262 github_user_id: i32,
263 product: Option<ProductCode>,
264}
265
266#[derive(Debug, Serialize)]
267struct CreateBillingSubscriptionResponse {
268 checkout_session_url: String,
269}
270
271/// Initiates a Stripe Checkout session for creating a billing subscription.
272async fn create_billing_subscription(
273 Extension(app): Extension<Arc<AppState>>,
274 extract::Json(body): extract::Json<CreateBillingSubscriptionBody>,
275) -> Result<Json<CreateBillingSubscriptionResponse>> {
276 let user = app
277 .db
278 .get_user_by_github_user_id(body.github_user_id)
279 .await?
280 .ok_or_else(|| anyhow!("user not found"))?;
281
282 let Some(stripe_client) = app.stripe_client.clone() else {
283 log::error!("failed to retrieve Stripe client");
284 Err(Error::http(
285 StatusCode::NOT_IMPLEMENTED,
286 "not supported".into(),
287 ))?
288 };
289 let Some(stripe_billing) = app.stripe_billing.clone() else {
290 log::error!("failed to retrieve Stripe billing object");
291 Err(Error::http(
292 StatusCode::NOT_IMPLEMENTED,
293 "not supported".into(),
294 ))?
295 };
296 let Some(llm_db) = app.llm_db.clone() else {
297 log::error!("failed to retrieve LLM database");
298 Err(Error::http(
299 StatusCode::NOT_IMPLEMENTED,
300 "not supported".into(),
301 ))?
302 };
303
304 if app.db.has_active_billing_subscription(user.id).await? {
305 return Err(Error::http(
306 StatusCode::CONFLICT,
307 "user already has an active subscription".into(),
308 ));
309 }
310
311 let existing_billing_customer = app.db.get_billing_customer_by_user_id(user.id).await?;
312 if let Some(existing_billing_customer) = &existing_billing_customer {
313 if existing_billing_customer.has_overdue_invoices {
314 return Err(Error::http(
315 StatusCode::PAYMENT_REQUIRED,
316 "user has overdue invoices".into(),
317 ));
318 }
319 }
320
321 let customer_id = if let Some(existing_customer) = &existing_billing_customer {
322 CustomerId::from_str(&existing_customer.stripe_customer_id)
323 .context("failed to parse customer ID")?
324 } else {
325 let existing_customer = if let Some(email) = user.email_address.as_deref() {
326 let customers = Customer::list(
327 &stripe_client,
328 &stripe::ListCustomers {
329 email: Some(email),
330 ..Default::default()
331 },
332 )
333 .await?;
334
335 customers.data.first().cloned()
336 } else {
337 None
338 };
339
340 if let Some(existing_customer) = existing_customer {
341 existing_customer.id
342 } else {
343 let customer = Customer::create(
344 &stripe_client,
345 CreateCustomer {
346 email: user.email_address.as_deref(),
347 ..Default::default()
348 },
349 )
350 .await?;
351
352 customer.id
353 }
354 };
355
356 let success_url = format!(
357 "{}/account?checkout_complete=1",
358 app.config.zed_dot_dev_url()
359 );
360
361 let checkout_session_url = match body.product {
362 Some(ProductCode::ZedPro) => {
363 stripe_billing
364 .checkout_with_price(
365 app.config.zed_pro_price_id()?,
366 customer_id,
367 &user.github_login,
368 &success_url,
369 )
370 .await?
371 }
372 Some(ProductCode::ZedProTrial) => {
373 if let Some(existing_billing_customer) = &existing_billing_customer {
374 if existing_billing_customer.trial_started_at.is_some() {
375 return Err(Error::http(
376 StatusCode::FORBIDDEN,
377 "user already used free trial".into(),
378 ));
379 }
380 }
381
382 stripe_billing
383 .checkout_with_zed_pro_trial(
384 app.config.zed_pro_price_id()?,
385 customer_id,
386 &user.github_login,
387 &success_url,
388 )
389 .await?
390 }
391 None => {
392 let default_model = llm_db.model(
393 zed_llm_client::LanguageModelProvider::Anthropic,
394 "claude-3-7-sonnet",
395 )?;
396 let stripe_model = stripe_billing
397 .register_model_for_token_based_usage(default_model)
398 .await?;
399 stripe_billing
400 .checkout(customer_id, &user.github_login, &stripe_model, &success_url)
401 .await?
402 }
403 };
404
405 Ok(Json(CreateBillingSubscriptionResponse {
406 checkout_session_url,
407 }))
408}
409
410#[derive(Debug, PartialEq, Deserialize)]
411#[serde(rename_all = "snake_case")]
412enum ManageSubscriptionIntent {
413 /// The user intends to manage their subscription.
414 ///
415 /// This will open the Stripe billing portal without putting the user in a specific flow.
416 ManageSubscription,
417 /// The user intends to upgrade to Zed Pro.
418 UpgradeToPro,
419 /// The user intends to cancel their subscription.
420 Cancel,
421 /// The user intends to stop the cancellation of their subscription.
422 StopCancellation,
423}
424
425#[derive(Debug, Deserialize)]
426struct ManageBillingSubscriptionBody {
427 github_user_id: i32,
428 intent: ManageSubscriptionIntent,
429 /// The ID of the subscription to manage.
430 subscription_id: BillingSubscriptionId,
431}
432
433#[derive(Debug, Serialize)]
434struct ManageBillingSubscriptionResponse {
435 billing_portal_session_url: Option<String>,
436}
437
438/// Initiates a Stripe customer portal session for managing a billing subscription.
439async fn manage_billing_subscription(
440 Extension(app): Extension<Arc<AppState>>,
441 extract::Json(body): extract::Json<ManageBillingSubscriptionBody>,
442) -> Result<Json<ManageBillingSubscriptionResponse>> {
443 let user = app
444 .db
445 .get_user_by_github_user_id(body.github_user_id)
446 .await?
447 .ok_or_else(|| anyhow!("user not found"))?;
448
449 let Some(stripe_client) = app.stripe_client.clone() else {
450 log::error!("failed to retrieve Stripe client");
451 Err(Error::http(
452 StatusCode::NOT_IMPLEMENTED,
453 "not supported".into(),
454 ))?
455 };
456
457 let customer = app
458 .db
459 .get_billing_customer_by_user_id(user.id)
460 .await?
461 .ok_or_else(|| anyhow!("billing customer not found"))?;
462 let customer_id = CustomerId::from_str(&customer.stripe_customer_id)
463 .context("failed to parse customer ID")?;
464
465 let subscription = app
466 .db
467 .get_billing_subscription_by_id(body.subscription_id)
468 .await?
469 .ok_or_else(|| anyhow!("subscription not found"))?;
470 let subscription_id = SubscriptionId::from_str(&subscription.stripe_subscription_id)
471 .context("failed to parse subscription ID")?;
472
473 if body.intent == ManageSubscriptionIntent::StopCancellation {
474 let updated_stripe_subscription = Subscription::update(
475 &stripe_client,
476 &subscription_id,
477 stripe::UpdateSubscription {
478 cancel_at_period_end: Some(false),
479 ..Default::default()
480 },
481 )
482 .await?;
483
484 app.db
485 .update_billing_subscription(
486 subscription.id,
487 &UpdateBillingSubscriptionParams {
488 stripe_cancel_at: ActiveValue::set(
489 updated_stripe_subscription
490 .cancel_at
491 .and_then(|cancel_at| DateTime::from_timestamp(cancel_at, 0))
492 .map(|time| time.naive_utc()),
493 ),
494 ..Default::default()
495 },
496 )
497 .await?;
498
499 return Ok(Json(ManageBillingSubscriptionResponse {
500 billing_portal_session_url: None,
501 }));
502 }
503
504 let flow = match body.intent {
505 ManageSubscriptionIntent::ManageSubscription => None,
506 ManageSubscriptionIntent::UpgradeToPro => {
507 let zed_pro_price_id = app.config.zed_pro_price_id()?;
508 let zed_free_price_id = app.config.zed_free_price_id()?;
509
510 let stripe_subscription =
511 Subscription::retrieve(&stripe_client, &subscription_id, &[]).await?;
512
513 let is_on_zed_pro_trial = stripe_subscription.status == SubscriptionStatus::Trialing
514 && stripe_subscription.items.data.iter().any(|item| {
515 item.price
516 .as_ref()
517 .map_or(false, |price| price.id == zed_pro_price_id)
518 });
519 if is_on_zed_pro_trial {
520 // If the user is already on a Zed Pro trial and wants to upgrade to Pro, we just need to end their trial early.
521 Subscription::update(
522 &stripe_client,
523 &stripe_subscription.id,
524 stripe::UpdateSubscription {
525 trial_end: Some(stripe::Scheduled::now()),
526 ..Default::default()
527 },
528 )
529 .await?;
530
531 return Ok(Json(ManageBillingSubscriptionResponse {
532 billing_portal_session_url: None,
533 }));
534 }
535
536 let subscription_item_to_update = stripe_subscription
537 .items
538 .data
539 .iter()
540 .find_map(|item| {
541 let price = item.price.as_ref()?;
542
543 if price.id == zed_free_price_id {
544 Some(item.id.clone())
545 } else {
546 None
547 }
548 })
549 .ok_or_else(|| anyhow!("No subscription item to update"))?;
550
551 Some(CreateBillingPortalSessionFlowData {
552 type_: CreateBillingPortalSessionFlowDataType::SubscriptionUpdateConfirm,
553 subscription_update_confirm: Some(
554 CreateBillingPortalSessionFlowDataSubscriptionUpdateConfirm {
555 subscription: subscription.stripe_subscription_id,
556 items: vec![
557 CreateBillingPortalSessionFlowDataSubscriptionUpdateConfirmItems {
558 id: subscription_item_to_update.to_string(),
559 price: Some(zed_pro_price_id.to_string()),
560 quantity: Some(1),
561 },
562 ],
563 discounts: None,
564 },
565 ),
566 ..Default::default()
567 })
568 }
569 ManageSubscriptionIntent::Cancel => Some(CreateBillingPortalSessionFlowData {
570 type_: CreateBillingPortalSessionFlowDataType::SubscriptionCancel,
571 after_completion: Some(CreateBillingPortalSessionFlowDataAfterCompletion {
572 type_: stripe::CreateBillingPortalSessionFlowDataAfterCompletionType::Redirect,
573 redirect: Some(CreateBillingPortalSessionFlowDataAfterCompletionRedirect {
574 return_url: format!("{}/account", app.config.zed_dot_dev_url()),
575 }),
576 ..Default::default()
577 }),
578 subscription_cancel: Some(
579 stripe::CreateBillingPortalSessionFlowDataSubscriptionCancel {
580 subscription: subscription.stripe_subscription_id,
581 retention: None,
582 },
583 ),
584 ..Default::default()
585 }),
586 ManageSubscriptionIntent::StopCancellation => unreachable!(),
587 };
588
589 let mut params = CreateBillingPortalSession::new(customer_id);
590 params.flow_data = flow;
591 let return_url = format!("{}/account", app.config.zed_dot_dev_url());
592 params.return_url = Some(&return_url);
593
594 let session = BillingPortalSession::create(&stripe_client, params).await?;
595
596 Ok(Json(ManageBillingSubscriptionResponse {
597 billing_portal_session_url: Some(session.url),
598 }))
599}
600
601/// The amount of time we wait in between each poll of Stripe events.
602///
603/// This value should strike a balance between:
604/// 1. Being short enough that we update quickly when something in Stripe changes
605/// 2. Being long enough that we don't eat into our rate limits.
606///
607/// As a point of reference, the Sequin folks say they have this at **500ms**:
608///
609/// > We poll the Stripe /events endpoint every 500ms per account
610/// >
611/// > — https://blog.sequinstream.com/events-not-webhooks/
612const POLL_EVENTS_INTERVAL: Duration = Duration::from_secs(5);
613
614/// The maximum number of events to return per page.
615///
616/// We set this to 100 (the max) so we have to make fewer requests to Stripe.
617///
618/// > Limit can range between 1 and 100, and the default is 10.
619const EVENTS_LIMIT_PER_PAGE: u64 = 100;
620
621/// The number of pages consisting entirely of already-processed events that we
622/// will see before we stop retrieving events.
623///
624/// This is used to prevent over-fetching the Stripe events API for events we've
625/// already seen and processed.
626const NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP: usize = 4;
627
628/// Polls the Stripe events API periodically to reconcile the records in our
629/// database with the data in Stripe.
630pub fn poll_stripe_events_periodically(app: Arc<AppState>, rpc_server: Arc<Server>) {
631 let Some(stripe_client) = app.stripe_client.clone() else {
632 log::warn!("failed to retrieve Stripe client");
633 return;
634 };
635
636 let executor = app.executor.clone();
637 executor.spawn_detached({
638 let executor = executor.clone();
639 async move {
640 loop {
641 poll_stripe_events(&app, &rpc_server, &stripe_client)
642 .await
643 .log_err();
644
645 executor.sleep(POLL_EVENTS_INTERVAL).await;
646 }
647 }
648 });
649}
650
651async fn poll_stripe_events(
652 app: &Arc<AppState>,
653 rpc_server: &Arc<Server>,
654 stripe_client: &stripe::Client,
655) -> anyhow::Result<()> {
656 fn event_type_to_string(event_type: EventType) -> String {
657 // Calling `to_string` on `stripe::EventType` members gives us a quoted string,
658 // so we need to unquote it.
659 event_type.to_string().trim_matches('"').to_string()
660 }
661
662 let event_types = [
663 EventType::CustomerCreated,
664 EventType::CustomerUpdated,
665 EventType::CustomerSubscriptionCreated,
666 EventType::CustomerSubscriptionUpdated,
667 EventType::CustomerSubscriptionPaused,
668 EventType::CustomerSubscriptionResumed,
669 EventType::CustomerSubscriptionDeleted,
670 ]
671 .into_iter()
672 .map(event_type_to_string)
673 .collect::<Vec<_>>();
674
675 let mut pages_of_already_processed_events = 0;
676 let mut unprocessed_events = Vec::new();
677
678 log::info!(
679 "Stripe events: starting retrieval for {}",
680 event_types.join(", ")
681 );
682 let mut params = ListEvents::new();
683 params.types = Some(event_types.clone());
684 params.limit = Some(EVENTS_LIMIT_PER_PAGE);
685
686 let mut event_pages = stripe::Event::list(&stripe_client, ¶ms)
687 .await?
688 .paginate(params);
689
690 loop {
691 let processed_event_ids = {
692 let event_ids = event_pages
693 .page
694 .data
695 .iter()
696 .map(|event| event.id.as_str())
697 .collect::<Vec<_>>();
698 app.db
699 .get_processed_stripe_events_by_event_ids(&event_ids)
700 .await?
701 .into_iter()
702 .map(|event| event.stripe_event_id)
703 .collect::<Vec<_>>()
704 };
705
706 let mut processed_events_in_page = 0;
707 let events_in_page = event_pages.page.data.len();
708 for event in &event_pages.page.data {
709 if processed_event_ids.contains(&event.id.to_string()) {
710 processed_events_in_page += 1;
711 log::debug!("Stripe events: already processed '{}', skipping", event.id);
712 } else {
713 unprocessed_events.push(event.clone());
714 }
715 }
716
717 if processed_events_in_page == events_in_page {
718 pages_of_already_processed_events += 1;
719 }
720
721 if event_pages.page.has_more {
722 if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP
723 {
724 log::info!(
725 "Stripe events: stopping, saw {pages_of_already_processed_events} pages of already-processed events"
726 );
727 break;
728 } else {
729 log::info!("Stripe events: retrieving next page");
730 event_pages = event_pages.next(&stripe_client).await?;
731 }
732 } else {
733 break;
734 }
735 }
736
737 log::info!("Stripe events: unprocessed {}", unprocessed_events.len());
738
739 // Sort all of the unprocessed events in ascending order, so we can handle them in the order they occurred.
740 unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id)));
741
742 for event in unprocessed_events {
743 let event_id = event.id.clone();
744 let processed_event_params = CreateProcessedStripeEventParams {
745 stripe_event_id: event.id.to_string(),
746 stripe_event_type: event_type_to_string(event.type_),
747 stripe_event_created_timestamp: event.created,
748 };
749
750 // If the event has happened too far in the past, we don't want to
751 // process it and risk overwriting other more-recent updates.
752 //
753 // 1 day was chosen arbitrarily. This could be made longer or shorter.
754 let one_day = Duration::from_secs(24 * 60 * 60);
755 let a_day_ago = Utc::now() - one_day;
756 if a_day_ago.timestamp() > event.created {
757 log::info!(
758 "Stripe events: event '{}' is more than {one_day:?} old, marking as processed",
759 event_id
760 );
761 app.db
762 .create_processed_stripe_event(&processed_event_params)
763 .await?;
764
765 return Ok(());
766 }
767
768 let process_result = match event.type_ {
769 EventType::CustomerCreated | EventType::CustomerUpdated => {
770 handle_customer_event(app, stripe_client, event).await
771 }
772 EventType::CustomerSubscriptionCreated
773 | EventType::CustomerSubscriptionUpdated
774 | EventType::CustomerSubscriptionPaused
775 | EventType::CustomerSubscriptionResumed
776 | EventType::CustomerSubscriptionDeleted => {
777 handle_customer_subscription_event(app, rpc_server, stripe_client, event).await
778 }
779 _ => Ok(()),
780 };
781
782 if let Some(()) = process_result
783 .with_context(|| format!("failed to process event {event_id} successfully"))
784 .log_err()
785 {
786 app.db
787 .create_processed_stripe_event(&processed_event_params)
788 .await?;
789 }
790 }
791
792 Ok(())
793}
794
795async fn handle_customer_event(
796 app: &Arc<AppState>,
797 _stripe_client: &stripe::Client,
798 event: stripe::Event,
799) -> anyhow::Result<()> {
800 let EventObject::Customer(customer) = event.data.object else {
801 bail!("unexpected event payload for {}", event.id);
802 };
803
804 log::info!("handling Stripe {} event: {}", event.type_, event.id);
805
806 let Some(email) = customer.email else {
807 log::info!("Stripe customer has no email: skipping");
808 return Ok(());
809 };
810
811 let Some(user) = app.db.get_user_by_email(&email).await? else {
812 log::info!("no user found for email: skipping");
813 return Ok(());
814 };
815
816 if let Some(existing_customer) = app
817 .db
818 .get_billing_customer_by_stripe_customer_id(&customer.id)
819 .await?
820 {
821 app.db
822 .update_billing_customer(
823 existing_customer.id,
824 &UpdateBillingCustomerParams {
825 // For now we just leave the information as-is, as it is not
826 // likely to change.
827 ..Default::default()
828 },
829 )
830 .await?;
831 } else {
832 app.db
833 .create_billing_customer(&CreateBillingCustomerParams {
834 user_id: user.id,
835 stripe_customer_id: customer.id.to_string(),
836 })
837 .await?;
838 }
839
840 Ok(())
841}
842
843async fn handle_customer_subscription_event(
844 app: &Arc<AppState>,
845 rpc_server: &Arc<Server>,
846 stripe_client: &stripe::Client,
847 event: stripe::Event,
848) -> anyhow::Result<()> {
849 let EventObject::Subscription(subscription) = event.data.object else {
850 bail!("unexpected event payload for {}", event.id);
851 };
852
853 log::info!("handling Stripe {} event: {}", event.type_, event.id);
854
855 let subscription_kind = maybe!({
856 let zed_pro_price_id = app.config.zed_pro_price_id().ok()?;
857 let zed_free_price_id = app.config.zed_free_price_id().ok()?;
858
859 subscription.items.data.iter().find_map(|item| {
860 let price = item.price.as_ref()?;
861
862 if price.id == zed_pro_price_id {
863 Some(if subscription.status == SubscriptionStatus::Trialing {
864 SubscriptionKind::ZedProTrial
865 } else {
866 SubscriptionKind::ZedPro
867 })
868 } else if price.id == zed_free_price_id {
869 Some(SubscriptionKind::ZedFree)
870 } else {
871 None
872 }
873 })
874 });
875
876 let billing_customer =
877 find_or_create_billing_customer(app, stripe_client, subscription.customer)
878 .await?
879 .ok_or_else(|| anyhow!("billing customer not found"))?;
880
881 if let Some(SubscriptionKind::ZedProTrial) = subscription_kind {
882 if subscription.status == SubscriptionStatus::Trialing {
883 let current_period_start =
884 DateTime::from_timestamp(subscription.current_period_start, 0)
885 .ok_or_else(|| anyhow!("No trial subscription period start"))?;
886
887 app.db
888 .update_billing_customer(
889 billing_customer.id,
890 &UpdateBillingCustomerParams {
891 trial_started_at: ActiveValue::set(Some(current_period_start.naive_utc())),
892 ..Default::default()
893 },
894 )
895 .await?;
896 }
897 }
898
899 let was_canceled_due_to_payment_failure = subscription.status == SubscriptionStatus::Canceled
900 && subscription
901 .cancellation_details
902 .as_ref()
903 .and_then(|details| details.reason)
904 .map_or(false, |reason| {
905 reason == CancellationDetailsReason::PaymentFailed
906 });
907
908 if was_canceled_due_to_payment_failure {
909 app.db
910 .update_billing_customer(
911 billing_customer.id,
912 &UpdateBillingCustomerParams {
913 has_overdue_invoices: ActiveValue::set(true),
914 ..Default::default()
915 },
916 )
917 .await?;
918 }
919
920 if let Some(existing_subscription) = app
921 .db
922 .get_billing_subscription_by_stripe_subscription_id(&subscription.id)
923 .await?
924 {
925 let llm_db = app
926 .llm_db
927 .clone()
928 .ok_or_else(|| anyhow!("LLM DB not initialized"))?;
929
930 let new_period_start_at =
931 chrono::DateTime::from_timestamp(subscription.current_period_start, 0)
932 .ok_or_else(|| anyhow!("No subscription period start"))?;
933 let new_period_end_at =
934 chrono::DateTime::from_timestamp(subscription.current_period_end, 0)
935 .ok_or_else(|| anyhow!("No subscription period end"))?;
936
937 llm_db
938 .transfer_existing_subscription_usage(
939 billing_customer.user_id,
940 &existing_subscription,
941 subscription_kind,
942 new_period_start_at,
943 new_period_end_at,
944 )
945 .await?;
946
947 app.db
948 .update_billing_subscription(
949 existing_subscription.id,
950 &UpdateBillingSubscriptionParams {
951 billing_customer_id: ActiveValue::set(billing_customer.id),
952 kind: ActiveValue::set(subscription_kind),
953 stripe_subscription_id: ActiveValue::set(subscription.id.to_string()),
954 stripe_subscription_status: ActiveValue::set(subscription.status.into()),
955 stripe_cancel_at: ActiveValue::set(
956 subscription
957 .cancel_at
958 .and_then(|cancel_at| DateTime::from_timestamp(cancel_at, 0))
959 .map(|time| time.naive_utc()),
960 ),
961 stripe_cancellation_reason: ActiveValue::set(
962 subscription
963 .cancellation_details
964 .and_then(|details| details.reason)
965 .map(|reason| reason.into()),
966 ),
967 stripe_current_period_start: ActiveValue::set(Some(
968 subscription.current_period_start,
969 )),
970 stripe_current_period_end: ActiveValue::set(Some(
971 subscription.current_period_end,
972 )),
973 },
974 )
975 .await?;
976 } else {
977 // If the user already has an active billing subscription, ignore the
978 // event and return an `Ok` to signal that it was processed
979 // successfully.
980 //
981 // There is the possibility that this could cause us to not create a
982 // subscription in the following scenario:
983 //
984 // 1. User has an active subscription A
985 // 2. User cancels subscription A
986 // 3. User creates a new subscription B
987 // 4. We process the new subscription B before the cancellation of subscription A
988 // 5. User ends up with no subscriptions
989 //
990 // In theory this situation shouldn't arise as we try to process the events in the order they occur.
991 if app
992 .db
993 .has_active_billing_subscription(billing_customer.user_id)
994 .await?
995 {
996 log::info!(
997 "user {user_id} already has an active subscription, skipping creation of subscription {subscription_id}",
998 user_id = billing_customer.user_id,
999 subscription_id = subscription.id
1000 );
1001 return Ok(());
1002 }
1003
1004 app.db
1005 .create_billing_subscription(&CreateBillingSubscriptionParams {
1006 billing_customer_id: billing_customer.id,
1007 kind: subscription_kind,
1008 stripe_subscription_id: subscription.id.to_string(),
1009 stripe_subscription_status: subscription.status.into(),
1010 stripe_cancellation_reason: subscription
1011 .cancellation_details
1012 .and_then(|details| details.reason)
1013 .map(|reason| reason.into()),
1014 stripe_current_period_start: Some(subscription.current_period_start),
1015 stripe_current_period_end: Some(subscription.current_period_end),
1016 })
1017 .await?;
1018 }
1019
1020 // When the user's subscription changes, we want to refresh their LLM tokens
1021 // to either grant/revoke access.
1022 rpc_server
1023 .refresh_llm_tokens_for_user(billing_customer.user_id)
1024 .await;
1025
1026 Ok(())
1027}
1028
1029#[derive(Debug, Deserialize)]
1030struct GetMonthlySpendParams {
1031 github_user_id: i32,
1032}
1033
1034#[derive(Debug, Serialize)]
1035struct GetMonthlySpendResponse {
1036 monthly_free_tier_spend_in_cents: u32,
1037 monthly_free_tier_allowance_in_cents: u32,
1038 monthly_spend_in_cents: u32,
1039}
1040
1041async fn get_monthly_spend(
1042 Extension(app): Extension<Arc<AppState>>,
1043 Query(params): Query<GetMonthlySpendParams>,
1044) -> Result<Json<GetMonthlySpendResponse>> {
1045 let user = app
1046 .db
1047 .get_user_by_github_user_id(params.github_user_id)
1048 .await?
1049 .ok_or_else(|| anyhow!("user not found"))?;
1050
1051 let Some(llm_db) = app.llm_db.clone() else {
1052 return Err(Error::http(
1053 StatusCode::NOT_IMPLEMENTED,
1054 "LLM database not available".into(),
1055 ));
1056 };
1057
1058 let free_tier = user
1059 .custom_llm_monthly_allowance_in_cents
1060 .map(|allowance| Cents(allowance as u32))
1061 .unwrap_or(FREE_TIER_MONTHLY_SPENDING_LIMIT);
1062
1063 let spending_for_month = llm_db
1064 .get_user_spending_for_month(user.id, Utc::now())
1065 .await?;
1066
1067 let free_tier_spend = Cents::min(spending_for_month, free_tier);
1068 let monthly_spend = spending_for_month.saturating_sub(free_tier);
1069
1070 Ok(Json(GetMonthlySpendResponse {
1071 monthly_free_tier_spend_in_cents: free_tier_spend.0,
1072 monthly_free_tier_allowance_in_cents: free_tier.0,
1073 monthly_spend_in_cents: monthly_spend.0,
1074 }))
1075}
1076
1077#[derive(Debug, Deserialize)]
1078struct GetCurrentUsageParams {
1079 github_user_id: i32,
1080}
1081
1082#[derive(Debug, Serialize)]
1083struct UsageCounts {
1084 pub used: i32,
1085 pub limit: Option<i32>,
1086 pub remaining: Option<i32>,
1087}
1088
1089#[derive(Debug, Serialize)]
1090struct GetCurrentUsageResponse {
1091 pub model_requests: UsageCounts,
1092 pub edit_predictions: UsageCounts,
1093}
1094
1095async fn get_current_usage(
1096 Extension(app): Extension<Arc<AppState>>,
1097 Query(params): Query<GetCurrentUsageParams>,
1098) -> Result<Json<GetCurrentUsageResponse>> {
1099 let user = app
1100 .db
1101 .get_user_by_github_user_id(params.github_user_id)
1102 .await?
1103 .ok_or_else(|| anyhow!("user not found"))?;
1104
1105 let Some(llm_db) = app.llm_db.clone() else {
1106 return Err(Error::http(
1107 StatusCode::NOT_IMPLEMENTED,
1108 "LLM database not available".into(),
1109 ));
1110 };
1111
1112 let empty_usage = GetCurrentUsageResponse {
1113 model_requests: UsageCounts {
1114 used: 0,
1115 limit: Some(0),
1116 remaining: Some(0),
1117 },
1118 edit_predictions: UsageCounts {
1119 used: 0,
1120 limit: Some(0),
1121 remaining: Some(0),
1122 },
1123 };
1124
1125 let Some(subscription) = app.db.get_active_billing_subscription(user.id).await? else {
1126 return Ok(Json(empty_usage));
1127 };
1128
1129 let subscription_period = maybe!({
1130 let period_start_at = subscription.current_period_start_at()?;
1131 let period_end_at = subscription.current_period_end_at()?;
1132
1133 Some((period_start_at, period_end_at))
1134 });
1135
1136 let Some((period_start_at, period_end_at)) = subscription_period else {
1137 return Ok(Json(empty_usage));
1138 };
1139
1140 let usage = llm_db
1141 .get_subscription_usage_for_period(user.id, period_start_at, period_end_at)
1142 .await?;
1143 let Some(usage) = usage else {
1144 return Ok(Json(empty_usage));
1145 };
1146
1147 let plan = match usage.plan {
1148 SubscriptionKind::ZedPro => zed_llm_client::Plan::ZedPro,
1149 SubscriptionKind::ZedProTrial => zed_llm_client::Plan::ZedProTrial,
1150 SubscriptionKind::ZedFree => zed_llm_client::Plan::Free,
1151 };
1152
1153 let model_requests_limit = match plan.model_requests_limit() {
1154 zed_llm_client::UsageLimit::Limited(limit) => Some(limit),
1155 zed_llm_client::UsageLimit::Unlimited => None,
1156 };
1157 let edit_prediction_limit = match plan.edit_predictions_limit() {
1158 zed_llm_client::UsageLimit::Limited(limit) => Some(limit),
1159 zed_llm_client::UsageLimit::Unlimited => None,
1160 };
1161
1162 Ok(Json(GetCurrentUsageResponse {
1163 model_requests: UsageCounts {
1164 used: usage.model_requests,
1165 limit: model_requests_limit,
1166 remaining: model_requests_limit.map(|limit| (limit - usage.model_requests).max(0)),
1167 },
1168 edit_predictions: UsageCounts {
1169 used: usage.edit_predictions,
1170 limit: edit_prediction_limit,
1171 remaining: edit_prediction_limit.map(|limit| (limit - usage.edit_predictions).max(0)),
1172 },
1173 }))
1174}
1175
1176impl From<SubscriptionStatus> for StripeSubscriptionStatus {
1177 fn from(value: SubscriptionStatus) -> Self {
1178 match value {
1179 SubscriptionStatus::Incomplete => Self::Incomplete,
1180 SubscriptionStatus::IncompleteExpired => Self::IncompleteExpired,
1181 SubscriptionStatus::Trialing => Self::Trialing,
1182 SubscriptionStatus::Active => Self::Active,
1183 SubscriptionStatus::PastDue => Self::PastDue,
1184 SubscriptionStatus::Canceled => Self::Canceled,
1185 SubscriptionStatus::Unpaid => Self::Unpaid,
1186 SubscriptionStatus::Paused => Self::Paused,
1187 }
1188 }
1189}
1190
1191impl From<CancellationDetailsReason> for StripeCancellationReason {
1192 fn from(value: CancellationDetailsReason) -> Self {
1193 match value {
1194 CancellationDetailsReason::CancellationRequested => Self::CancellationRequested,
1195 CancellationDetailsReason::PaymentDisputed => Self::PaymentDisputed,
1196 CancellationDetailsReason::PaymentFailed => Self::PaymentFailed,
1197 }
1198 }
1199}
1200
1201/// Finds or creates a billing customer using the provided customer.
1202async fn find_or_create_billing_customer(
1203 app: &Arc<AppState>,
1204 stripe_client: &stripe::Client,
1205 customer_or_id: Expandable<Customer>,
1206) -> anyhow::Result<Option<billing_customer::Model>> {
1207 let customer_id = match &customer_or_id {
1208 Expandable::Id(id) => id,
1209 Expandable::Object(customer) => customer.id.as_ref(),
1210 };
1211
1212 // If we already have a billing customer record associated with the Stripe customer,
1213 // there's nothing more we need to do.
1214 if let Some(billing_customer) = app
1215 .db
1216 .get_billing_customer_by_stripe_customer_id(customer_id)
1217 .await?
1218 {
1219 return Ok(Some(billing_customer));
1220 }
1221
1222 // If all we have is a customer ID, resolve it to a full customer record by
1223 // hitting the Stripe API.
1224 let customer = match customer_or_id {
1225 Expandable::Id(id) => Customer::retrieve(stripe_client, &id, &[]).await?,
1226 Expandable::Object(customer) => *customer,
1227 };
1228
1229 let Some(email) = customer.email else {
1230 return Ok(None);
1231 };
1232
1233 let Some(user) = app.db.get_user_by_email(&email).await? else {
1234 return Ok(None);
1235 };
1236
1237 let billing_customer = app
1238 .db
1239 .create_billing_customer(&CreateBillingCustomerParams {
1240 user_id: user.id,
1241 stripe_customer_id: customer.id.to_string(),
1242 })
1243 .await?;
1244
1245 Ok(Some(billing_customer))
1246}
1247
1248const SYNC_LLM_TOKEN_USAGE_WITH_STRIPE_INTERVAL: Duration = Duration::from_secs(60);
1249
1250pub fn sync_llm_token_usage_with_stripe_periodically(app: Arc<AppState>) {
1251 let Some(stripe_billing) = app.stripe_billing.clone() else {
1252 log::warn!("failed to retrieve Stripe billing object");
1253 return;
1254 };
1255 let Some(llm_db) = app.llm_db.clone() else {
1256 log::warn!("failed to retrieve LLM database");
1257 return;
1258 };
1259
1260 let executor = app.executor.clone();
1261 executor.spawn_detached({
1262 let executor = executor.clone();
1263 async move {
1264 loop {
1265 sync_token_usage_with_stripe(&app, &llm_db, &stripe_billing)
1266 .await
1267 .context("failed to sync LLM usage to Stripe")
1268 .trace_err();
1269 executor
1270 .sleep(SYNC_LLM_TOKEN_USAGE_WITH_STRIPE_INTERVAL)
1271 .await;
1272 }
1273 }
1274 });
1275}
1276
1277async fn sync_token_usage_with_stripe(
1278 app: &Arc<AppState>,
1279 llm_db: &Arc<LlmDatabase>,
1280 stripe_billing: &Arc<StripeBilling>,
1281) -> anyhow::Result<()> {
1282 let events = llm_db.get_billing_events().await?;
1283 let user_ids = events
1284 .iter()
1285 .map(|(event, _)| event.user_id)
1286 .collect::<HashSet<UserId>>();
1287 let stripe_subscriptions = app.db.get_active_billing_subscriptions(user_ids).await?;
1288
1289 for (event, model) in events {
1290 let Some((stripe_db_customer, stripe_db_subscription)) =
1291 stripe_subscriptions.get(&event.user_id)
1292 else {
1293 tracing::warn!(
1294 user_id = event.user_id.0,
1295 "Registered billing event for user who is not a Stripe customer. Billing events should only be created for users who are Stripe customers, so this is a mistake on our side."
1296 );
1297 continue;
1298 };
1299 let stripe_subscription_id: stripe::SubscriptionId = stripe_db_subscription
1300 .stripe_subscription_id
1301 .parse()
1302 .context("failed to parse stripe subscription id from db")?;
1303 let stripe_customer_id: stripe::CustomerId = stripe_db_customer
1304 .stripe_customer_id
1305 .parse()
1306 .context("failed to parse stripe customer id from db")?;
1307
1308 let stripe_model = stripe_billing
1309 .register_model_for_token_based_usage(&model)
1310 .await?;
1311 stripe_billing
1312 .subscribe_to_model(&stripe_subscription_id, &stripe_model)
1313 .await?;
1314 stripe_billing
1315 .bill_model_token_usage(&stripe_customer_id, &stripe_model, &event)
1316 .await?;
1317 llm_db.consume_billing_event(event.id).await?;
1318 }
1319
1320 Ok(())
1321}
1322
1323const SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL: Duration = Duration::from_secs(60);
1324
1325pub fn sync_llm_request_usage_with_stripe_periodically(app: Arc<AppState>) {
1326 let Some(stripe_billing) = app.stripe_billing.clone() else {
1327 log::warn!("failed to retrieve Stripe billing object");
1328 return;
1329 };
1330 let Some(llm_db) = app.llm_db.clone() else {
1331 log::warn!("failed to retrieve LLM database");
1332 return;
1333 };
1334
1335 let executor = app.executor.clone();
1336 executor.spawn_detached({
1337 let executor = executor.clone();
1338 async move {
1339 loop {
1340 sync_model_request_usage_with_stripe(&app, &llm_db, &stripe_billing)
1341 .await
1342 .context("failed to sync LLM request usage to Stripe")
1343 .trace_err();
1344 executor
1345 .sleep(SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL)
1346 .await;
1347 }
1348 }
1349 });
1350}
1351
1352async fn sync_model_request_usage_with_stripe(
1353 app: &Arc<AppState>,
1354 llm_db: &Arc<LlmDatabase>,
1355 stripe_billing: &Arc<StripeBilling>,
1356) -> anyhow::Result<()> {
1357 let usage_meters = llm_db
1358 .get_current_subscription_usage_meters(Utc::now())
1359 .await?;
1360 let user_ids = usage_meters
1361 .iter()
1362 .map(|(_, usage)| usage.user_id)
1363 .collect::<HashSet<UserId>>();
1364 let billing_subscriptions = app
1365 .db
1366 .get_active_zed_pro_billing_subscriptions(user_ids)
1367 .await?;
1368
1369 let claude_3_5_sonnet = stripe_billing
1370 .find_price_by_lookup_key("claude-3-5-sonnet-requests")
1371 .await?;
1372 let claude_3_7_sonnet = stripe_billing
1373 .find_price_by_lookup_key("claude-3-7-sonnet-requests")
1374 .await?;
1375
1376 for (usage_meter, usage) in usage_meters {
1377 maybe!(async {
1378 let Some((billing_customer, billing_subscription)) =
1379 billing_subscriptions.get(&usage.user_id)
1380 else {
1381 bail!(
1382 "Attempted to sync usage meter for user who is not a Stripe customer: {}",
1383 usage.user_id
1384 );
1385 };
1386
1387 let stripe_customer_id = billing_customer
1388 .stripe_customer_id
1389 .parse::<stripe::CustomerId>()
1390 .context("failed to parse Stripe customer ID from database")?;
1391 let stripe_subscription_id = billing_subscription
1392 .stripe_subscription_id
1393 .parse::<stripe::SubscriptionId>()
1394 .context("failed to parse Stripe subscription ID from database")?;
1395
1396 let model = llm_db.model_by_id(usage_meter.model_id)?;
1397
1398 let (price_id, meter_event_name) = match model.name.as_str() {
1399 "claude-3-5-sonnet" => (&claude_3_5_sonnet.id, "claude_3_5_sonnet/requests"),
1400 "claude-3-7-sonnet" => (&claude_3_7_sonnet.id, "claude_3_7_sonnet/requests"),
1401 model_name => {
1402 bail!("Attempted to sync usage meter for unsupported model: {model_name:?}")
1403 }
1404 };
1405
1406 stripe_billing
1407 .subscribe_to_price(&stripe_subscription_id, price_id)
1408 .await?;
1409 stripe_billing
1410 .bill_model_request_usage(
1411 &stripe_customer_id,
1412 meter_event_name,
1413 usage_meter.requests,
1414 )
1415 .await?;
1416
1417 Ok(())
1418 })
1419 .await
1420 .log_err();
1421 }
1422
1423 Ok(())
1424}