1use anyhow::{Context, anyhow, bail};
2use axum::{
3 Extension, Json, Router,
4 extract::{self, Query},
5 routing::{get, post},
6};
7use chrono::{DateTime, SecondsFormat, Utc};
8use collections::HashSet;
9use reqwest::StatusCode;
10use sea_orm::ActiveValue;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use std::{str::FromStr, sync::Arc, time::Duration};
14use stripe::{
15 BillingPortalSession, CancellationDetailsReason, CreateBillingPortalSession,
16 CreateBillingPortalSessionFlowData, CreateBillingPortalSessionFlowDataAfterCompletion,
17 CreateBillingPortalSessionFlowDataAfterCompletionRedirect,
18 CreateBillingPortalSessionFlowDataSubscriptionUpdateConfirm,
19 CreateBillingPortalSessionFlowDataSubscriptionUpdateConfirmItems,
20 CreateBillingPortalSessionFlowDataType, CreateCustomer, Customer, CustomerId, EventObject,
21 EventType, Expandable, ListEvents, Subscription, SubscriptionId, SubscriptionStatus,
22};
23use util::{ResultExt, maybe};
24
25use crate::api::events::SnowflakeRow;
26use crate::db::billing_subscription::{
27 StripeCancellationReason, StripeSubscriptionStatus, SubscriptionKind,
28};
29use crate::llm::db::subscription_usage_meter::CompletionMode;
30use crate::llm::{DEFAULT_MAX_MONTHLY_SPEND, FREE_TIER_MONTHLY_SPENDING_LIMIT};
31use crate::rpc::{ResultExt as _, Server};
32use crate::{AppState, Cents, Error, Result};
33use crate::{db::UserId, llm::db::LlmDatabase};
34use crate::{
35 db::{
36 BillingSubscriptionId, CreateBillingCustomerParams, CreateBillingSubscriptionParams,
37 CreateProcessedStripeEventParams, UpdateBillingCustomerParams,
38 UpdateBillingPreferencesParams, UpdateBillingSubscriptionParams, billing_customer,
39 },
40 stripe_billing::StripeBilling,
41};
42
43pub fn router() -> Router {
44 Router::new()
45 .route(
46 "/billing/preferences",
47 get(get_billing_preferences).put(update_billing_preferences),
48 )
49 .route(
50 "/billing/subscriptions",
51 get(list_billing_subscriptions).post(create_billing_subscription),
52 )
53 .route(
54 "/billing/subscriptions/manage",
55 post(manage_billing_subscription),
56 )
57 .route("/billing/monthly_spend", get(get_monthly_spend))
58 .route("/billing/usage", get(get_current_usage))
59}
60
61#[derive(Debug, Deserialize)]
62struct GetBillingPreferencesParams {
63 github_user_id: i32,
64}
65
66#[derive(Debug, Serialize)]
67struct BillingPreferencesResponse {
68 max_monthly_llm_usage_spending_in_cents: i32,
69 model_request_overages_enabled: bool,
70 model_request_overages_spend_limit_in_cents: i32,
71}
72
73async fn get_billing_preferences(
74 Extension(app): Extension<Arc<AppState>>,
75 Query(params): Query<GetBillingPreferencesParams>,
76) -> Result<Json<BillingPreferencesResponse>> {
77 let user = app
78 .db
79 .get_user_by_github_user_id(params.github_user_id)
80 .await?
81 .ok_or_else(|| anyhow!("user not found"))?;
82
83 let preferences = app.db.get_billing_preferences(user.id).await?;
84
85 Ok(Json(BillingPreferencesResponse {
86 max_monthly_llm_usage_spending_in_cents: preferences
87 .as_ref()
88 .map_or(DEFAULT_MAX_MONTHLY_SPEND.0 as i32, |preferences| {
89 preferences.max_monthly_llm_usage_spending_in_cents
90 }),
91 model_request_overages_enabled: preferences.as_ref().map_or(false, |preferences| {
92 preferences.model_request_overages_enabled
93 }),
94 model_request_overages_spend_limit_in_cents: preferences
95 .as_ref()
96 .map_or(0, |preferences| {
97 preferences.model_request_overages_spend_limit_in_cents
98 }),
99 }))
100}
101
102#[derive(Debug, Deserialize)]
103struct UpdateBillingPreferencesBody {
104 github_user_id: i32,
105 #[serde(default)]
106 max_monthly_llm_usage_spending_in_cents: i32,
107 #[serde(default)]
108 model_request_overages_enabled: bool,
109 #[serde(default)]
110 model_request_overages_spend_limit_in_cents: i32,
111}
112
113async fn update_billing_preferences(
114 Extension(app): Extension<Arc<AppState>>,
115 Extension(rpc_server): Extension<Arc<crate::rpc::Server>>,
116 extract::Json(body): extract::Json<UpdateBillingPreferencesBody>,
117) -> Result<Json<BillingPreferencesResponse>> {
118 let user = app
119 .db
120 .get_user_by_github_user_id(body.github_user_id)
121 .await?
122 .ok_or_else(|| anyhow!("user not found"))?;
123
124 let max_monthly_llm_usage_spending_in_cents =
125 body.max_monthly_llm_usage_spending_in_cents.max(0);
126 let model_request_overages_spend_limit_in_cents =
127 body.model_request_overages_spend_limit_in_cents.max(0);
128
129 let billing_preferences =
130 if let Some(_billing_preferences) = app.db.get_billing_preferences(user.id).await? {
131 app.db
132 .update_billing_preferences(
133 user.id,
134 &UpdateBillingPreferencesParams {
135 max_monthly_llm_usage_spending_in_cents: ActiveValue::set(
136 max_monthly_llm_usage_spending_in_cents,
137 ),
138 model_request_overages_enabled: ActiveValue::set(
139 body.model_request_overages_enabled,
140 ),
141 model_request_overages_spend_limit_in_cents: ActiveValue::set(
142 model_request_overages_spend_limit_in_cents,
143 ),
144 },
145 )
146 .await?
147 } else {
148 app.db
149 .create_billing_preferences(
150 user.id,
151 &crate::db::CreateBillingPreferencesParams {
152 max_monthly_llm_usage_spending_in_cents,
153 model_request_overages_enabled: body.model_request_overages_enabled,
154 model_request_overages_spend_limit_in_cents,
155 },
156 )
157 .await?
158 };
159
160 SnowflakeRow::new(
161 "Billing Preferences Updated",
162 Some(user.metrics_id),
163 user.admin,
164 None,
165 json!({
166 "user_id": user.id,
167 "model_request_overages_enabled": billing_preferences.model_request_overages_enabled,
168 "model_request_overages_spend_limit_in_cents": billing_preferences.model_request_overages_spend_limit_in_cents,
169 "max_monthly_llm_usage_spending_in_cents": billing_preferences.max_monthly_llm_usage_spending_in_cents,
170 }),
171 )
172 .write(&app.kinesis_client, &app.config.kinesis_stream)
173 .await
174 .log_err();
175
176 rpc_server.refresh_llm_tokens_for_user(user.id).await;
177
178 Ok(Json(BillingPreferencesResponse {
179 max_monthly_llm_usage_spending_in_cents: billing_preferences
180 .max_monthly_llm_usage_spending_in_cents,
181 model_request_overages_enabled: billing_preferences.model_request_overages_enabled,
182 model_request_overages_spend_limit_in_cents: billing_preferences
183 .model_request_overages_spend_limit_in_cents,
184 }))
185}
186
187#[derive(Debug, Deserialize)]
188struct ListBillingSubscriptionsParams {
189 github_user_id: i32,
190}
191
192#[derive(Debug, Serialize)]
193struct BillingSubscriptionJson {
194 id: BillingSubscriptionId,
195 name: String,
196 status: StripeSubscriptionStatus,
197 trial_end_at: Option<String>,
198 cancel_at: Option<String>,
199 /// Whether this subscription can be canceled.
200 is_cancelable: bool,
201}
202
203#[derive(Debug, Serialize)]
204struct ListBillingSubscriptionsResponse {
205 subscriptions: Vec<BillingSubscriptionJson>,
206}
207
208async fn list_billing_subscriptions(
209 Extension(app): Extension<Arc<AppState>>,
210 Query(params): Query<ListBillingSubscriptionsParams>,
211) -> Result<Json<ListBillingSubscriptionsResponse>> {
212 let user = app
213 .db
214 .get_user_by_github_user_id(params.github_user_id)
215 .await?
216 .ok_or_else(|| anyhow!("user not found"))?;
217
218 let subscriptions = app.db.get_billing_subscriptions(user.id).await?;
219
220 Ok(Json(ListBillingSubscriptionsResponse {
221 subscriptions: subscriptions
222 .into_iter()
223 .map(|subscription| BillingSubscriptionJson {
224 id: subscription.id,
225 name: match subscription.kind {
226 Some(SubscriptionKind::ZedPro) => "Zed Pro".to_string(),
227 Some(SubscriptionKind::ZedProTrial) => "Zed Pro (Trial)".to_string(),
228 Some(SubscriptionKind::ZedFree) => "Zed Free".to_string(),
229 None => "Zed LLM Usage".to_string(),
230 },
231 status: subscription.stripe_subscription_status,
232 trial_end_at: if subscription.kind == Some(SubscriptionKind::ZedProTrial) {
233 maybe!({
234 let end_at = subscription.stripe_current_period_end?;
235 let end_at = DateTime::from_timestamp(end_at, 0)?;
236
237 Some(end_at.to_rfc3339_opts(SecondsFormat::Millis, true))
238 })
239 } else {
240 None
241 },
242 cancel_at: subscription.stripe_cancel_at.map(|cancel_at| {
243 cancel_at
244 .and_utc()
245 .to_rfc3339_opts(SecondsFormat::Millis, true)
246 }),
247 is_cancelable: subscription.stripe_subscription_status.is_cancelable()
248 && subscription.stripe_cancel_at.is_none(),
249 })
250 .collect(),
251 }))
252}
253
254#[derive(Debug, Clone, Copy, Deserialize)]
255#[serde(rename_all = "snake_case")]
256enum ProductCode {
257 ZedPro,
258 ZedProTrial,
259}
260
261#[derive(Debug, Deserialize)]
262struct CreateBillingSubscriptionBody {
263 github_user_id: i32,
264 product: Option<ProductCode>,
265}
266
267#[derive(Debug, Serialize)]
268struct CreateBillingSubscriptionResponse {
269 checkout_session_url: String,
270}
271
272/// Initiates a Stripe Checkout session for creating a billing subscription.
273async fn create_billing_subscription(
274 Extension(app): Extension<Arc<AppState>>,
275 extract::Json(body): extract::Json<CreateBillingSubscriptionBody>,
276) -> Result<Json<CreateBillingSubscriptionResponse>> {
277 let user = app
278 .db
279 .get_user_by_github_user_id(body.github_user_id)
280 .await?
281 .ok_or_else(|| anyhow!("user not found"))?;
282
283 let Some(stripe_client) = app.stripe_client.clone() else {
284 log::error!("failed to retrieve Stripe client");
285 Err(Error::http(
286 StatusCode::NOT_IMPLEMENTED,
287 "not supported".into(),
288 ))?
289 };
290 let Some(stripe_billing) = app.stripe_billing.clone() else {
291 log::error!("failed to retrieve Stripe billing object");
292 Err(Error::http(
293 StatusCode::NOT_IMPLEMENTED,
294 "not supported".into(),
295 ))?
296 };
297 let Some(llm_db) = app.llm_db.clone() else {
298 log::error!("failed to retrieve LLM database");
299 Err(Error::http(
300 StatusCode::NOT_IMPLEMENTED,
301 "not supported".into(),
302 ))?
303 };
304
305 if app.db.has_active_billing_subscription(user.id).await? {
306 return Err(Error::http(
307 StatusCode::CONFLICT,
308 "user already has an active subscription".into(),
309 ));
310 }
311
312 let existing_billing_customer = app.db.get_billing_customer_by_user_id(user.id).await?;
313 if let Some(existing_billing_customer) = &existing_billing_customer {
314 if existing_billing_customer.has_overdue_invoices {
315 return Err(Error::http(
316 StatusCode::PAYMENT_REQUIRED,
317 "user has overdue invoices".into(),
318 ));
319 }
320 }
321
322 let customer_id = if let Some(existing_customer) = &existing_billing_customer {
323 CustomerId::from_str(&existing_customer.stripe_customer_id)
324 .context("failed to parse customer ID")?
325 } else {
326 let existing_customer = if let Some(email) = user.email_address.as_deref() {
327 let customers = Customer::list(
328 &stripe_client,
329 &stripe::ListCustomers {
330 email: Some(email),
331 ..Default::default()
332 },
333 )
334 .await?;
335
336 customers.data.first().cloned()
337 } else {
338 None
339 };
340
341 if let Some(existing_customer) = existing_customer {
342 existing_customer.id
343 } else {
344 let customer = Customer::create(
345 &stripe_client,
346 CreateCustomer {
347 email: user.email_address.as_deref(),
348 ..Default::default()
349 },
350 )
351 .await?;
352
353 customer.id
354 }
355 };
356
357 let success_url = format!(
358 "{}/account?checkout_complete=1",
359 app.config.zed_dot_dev_url()
360 );
361
362 let checkout_session_url = match body.product {
363 Some(ProductCode::ZedPro) => {
364 stripe_billing
365 .checkout_with_price(
366 app.config.zed_pro_price_id()?,
367 customer_id,
368 &user.github_login,
369 &success_url,
370 )
371 .await?
372 }
373 Some(ProductCode::ZedProTrial) => {
374 if let Some(existing_billing_customer) = &existing_billing_customer {
375 if existing_billing_customer.trial_started_at.is_some() {
376 return Err(Error::http(
377 StatusCode::FORBIDDEN,
378 "user already used free trial".into(),
379 ));
380 }
381 }
382
383 let feature_flags = app.db.get_user_flags(user.id).await?;
384
385 stripe_billing
386 .checkout_with_zed_pro_trial(
387 app.config.zed_pro_price_id()?,
388 customer_id,
389 &user.github_login,
390 feature_flags,
391 &success_url,
392 )
393 .await?
394 }
395 None => {
396 let default_model = llm_db.model(
397 zed_llm_client::LanguageModelProvider::Anthropic,
398 "claude-3-7-sonnet",
399 )?;
400 let stripe_model = stripe_billing
401 .register_model_for_token_based_usage(default_model)
402 .await?;
403 stripe_billing
404 .checkout(customer_id, &user.github_login, &stripe_model, &success_url)
405 .await?
406 }
407 };
408
409 Ok(Json(CreateBillingSubscriptionResponse {
410 checkout_session_url,
411 }))
412}
413
414#[derive(Debug, PartialEq, Deserialize)]
415#[serde(rename_all = "snake_case")]
416enum ManageSubscriptionIntent {
417 /// The user intends to manage their subscription.
418 ///
419 /// This will open the Stripe billing portal without putting the user in a specific flow.
420 ManageSubscription,
421 /// The user intends to upgrade to Zed Pro.
422 UpgradeToPro,
423 /// The user intends to cancel their subscription.
424 Cancel,
425 /// The user intends to stop the cancellation of their subscription.
426 StopCancellation,
427}
428
429#[derive(Debug, Deserialize)]
430struct ManageBillingSubscriptionBody {
431 github_user_id: i32,
432 intent: ManageSubscriptionIntent,
433 /// The ID of the subscription to manage.
434 subscription_id: BillingSubscriptionId,
435}
436
437#[derive(Debug, Serialize)]
438struct ManageBillingSubscriptionResponse {
439 billing_portal_session_url: Option<String>,
440}
441
442/// Initiates a Stripe customer portal session for managing a billing subscription.
443async fn manage_billing_subscription(
444 Extension(app): Extension<Arc<AppState>>,
445 extract::Json(body): extract::Json<ManageBillingSubscriptionBody>,
446) -> Result<Json<ManageBillingSubscriptionResponse>> {
447 let user = app
448 .db
449 .get_user_by_github_user_id(body.github_user_id)
450 .await?
451 .ok_or_else(|| anyhow!("user not found"))?;
452
453 let Some(stripe_client) = app.stripe_client.clone() else {
454 log::error!("failed to retrieve Stripe client");
455 Err(Error::http(
456 StatusCode::NOT_IMPLEMENTED,
457 "not supported".into(),
458 ))?
459 };
460
461 let customer = app
462 .db
463 .get_billing_customer_by_user_id(user.id)
464 .await?
465 .ok_or_else(|| anyhow!("billing customer not found"))?;
466 let customer_id = CustomerId::from_str(&customer.stripe_customer_id)
467 .context("failed to parse customer ID")?;
468
469 let subscription = app
470 .db
471 .get_billing_subscription_by_id(body.subscription_id)
472 .await?
473 .ok_or_else(|| anyhow!("subscription not found"))?;
474 let subscription_id = SubscriptionId::from_str(&subscription.stripe_subscription_id)
475 .context("failed to parse subscription ID")?;
476
477 if body.intent == ManageSubscriptionIntent::StopCancellation {
478 let updated_stripe_subscription = Subscription::update(
479 &stripe_client,
480 &subscription_id,
481 stripe::UpdateSubscription {
482 cancel_at_period_end: Some(false),
483 ..Default::default()
484 },
485 )
486 .await?;
487
488 app.db
489 .update_billing_subscription(
490 subscription.id,
491 &UpdateBillingSubscriptionParams {
492 stripe_cancel_at: ActiveValue::set(
493 updated_stripe_subscription
494 .cancel_at
495 .and_then(|cancel_at| DateTime::from_timestamp(cancel_at, 0))
496 .map(|time| time.naive_utc()),
497 ),
498 ..Default::default()
499 },
500 )
501 .await?;
502
503 return Ok(Json(ManageBillingSubscriptionResponse {
504 billing_portal_session_url: None,
505 }));
506 }
507
508 let flow = match body.intent {
509 ManageSubscriptionIntent::ManageSubscription => None,
510 ManageSubscriptionIntent::UpgradeToPro => {
511 let zed_pro_price_id = app.config.zed_pro_price_id()?;
512 let zed_free_price_id = app.config.zed_free_price_id()?;
513
514 let stripe_subscription =
515 Subscription::retrieve(&stripe_client, &subscription_id, &[]).await?;
516
517 let is_on_zed_pro_trial = stripe_subscription.status == SubscriptionStatus::Trialing
518 && stripe_subscription.items.data.iter().any(|item| {
519 item.price
520 .as_ref()
521 .map_or(false, |price| price.id == zed_pro_price_id)
522 });
523 if is_on_zed_pro_trial {
524 // If the user is already on a Zed Pro trial and wants to upgrade to Pro, we just need to end their trial early.
525 Subscription::update(
526 &stripe_client,
527 &stripe_subscription.id,
528 stripe::UpdateSubscription {
529 trial_end: Some(stripe::Scheduled::now()),
530 ..Default::default()
531 },
532 )
533 .await?;
534
535 return Ok(Json(ManageBillingSubscriptionResponse {
536 billing_portal_session_url: None,
537 }));
538 }
539
540 let subscription_item_to_update = stripe_subscription
541 .items
542 .data
543 .iter()
544 .find_map(|item| {
545 let price = item.price.as_ref()?;
546
547 if price.id == zed_free_price_id {
548 Some(item.id.clone())
549 } else {
550 None
551 }
552 })
553 .ok_or_else(|| anyhow!("No subscription item to update"))?;
554
555 Some(CreateBillingPortalSessionFlowData {
556 type_: CreateBillingPortalSessionFlowDataType::SubscriptionUpdateConfirm,
557 subscription_update_confirm: Some(
558 CreateBillingPortalSessionFlowDataSubscriptionUpdateConfirm {
559 subscription: subscription.stripe_subscription_id,
560 items: vec![
561 CreateBillingPortalSessionFlowDataSubscriptionUpdateConfirmItems {
562 id: subscription_item_to_update.to_string(),
563 price: Some(zed_pro_price_id.to_string()),
564 quantity: Some(1),
565 },
566 ],
567 discounts: None,
568 },
569 ),
570 ..Default::default()
571 })
572 }
573 ManageSubscriptionIntent::Cancel => Some(CreateBillingPortalSessionFlowData {
574 type_: CreateBillingPortalSessionFlowDataType::SubscriptionCancel,
575 after_completion: Some(CreateBillingPortalSessionFlowDataAfterCompletion {
576 type_: stripe::CreateBillingPortalSessionFlowDataAfterCompletionType::Redirect,
577 redirect: Some(CreateBillingPortalSessionFlowDataAfterCompletionRedirect {
578 return_url: format!("{}/account", app.config.zed_dot_dev_url()),
579 }),
580 ..Default::default()
581 }),
582 subscription_cancel: Some(
583 stripe::CreateBillingPortalSessionFlowDataSubscriptionCancel {
584 subscription: subscription.stripe_subscription_id,
585 retention: None,
586 },
587 ),
588 ..Default::default()
589 }),
590 ManageSubscriptionIntent::StopCancellation => unreachable!(),
591 };
592
593 let mut params = CreateBillingPortalSession::new(customer_id);
594 params.flow_data = flow;
595 let return_url = format!("{}/account", app.config.zed_dot_dev_url());
596 params.return_url = Some(&return_url);
597
598 let session = BillingPortalSession::create(&stripe_client, params).await?;
599
600 Ok(Json(ManageBillingSubscriptionResponse {
601 billing_portal_session_url: Some(session.url),
602 }))
603}
604
605/// The amount of time we wait in between each poll of Stripe events.
606///
607/// This value should strike a balance between:
608/// 1. Being short enough that we update quickly when something in Stripe changes
609/// 2. Being long enough that we don't eat into our rate limits.
610///
611/// As a point of reference, the Sequin folks say they have this at **500ms**:
612///
613/// > We poll the Stripe /events endpoint every 500ms per account
614/// >
615/// > — https://blog.sequinstream.com/events-not-webhooks/
616const POLL_EVENTS_INTERVAL: Duration = Duration::from_secs(5);
617
618/// The maximum number of events to return per page.
619///
620/// We set this to 100 (the max) so we have to make fewer requests to Stripe.
621///
622/// > Limit can range between 1 and 100, and the default is 10.
623const EVENTS_LIMIT_PER_PAGE: u64 = 100;
624
625/// The number of pages consisting entirely of already-processed events that we
626/// will see before we stop retrieving events.
627///
628/// This is used to prevent over-fetching the Stripe events API for events we've
629/// already seen and processed.
630const NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP: usize = 4;
631
632/// Polls the Stripe events API periodically to reconcile the records in our
633/// database with the data in Stripe.
634pub fn poll_stripe_events_periodically(app: Arc<AppState>, rpc_server: Arc<Server>) {
635 let Some(stripe_client) = app.stripe_client.clone() else {
636 log::warn!("failed to retrieve Stripe client");
637 return;
638 };
639
640 let executor = app.executor.clone();
641 executor.spawn_detached({
642 let executor = executor.clone();
643 async move {
644 loop {
645 poll_stripe_events(&app, &rpc_server, &stripe_client)
646 .await
647 .log_err();
648
649 executor.sleep(POLL_EVENTS_INTERVAL).await;
650 }
651 }
652 });
653}
654
655async fn poll_stripe_events(
656 app: &Arc<AppState>,
657 rpc_server: &Arc<Server>,
658 stripe_client: &stripe::Client,
659) -> anyhow::Result<()> {
660 fn event_type_to_string(event_type: EventType) -> String {
661 // Calling `to_string` on `stripe::EventType` members gives us a quoted string,
662 // so we need to unquote it.
663 event_type.to_string().trim_matches('"').to_string()
664 }
665
666 let event_types = [
667 EventType::CustomerCreated,
668 EventType::CustomerUpdated,
669 EventType::CustomerSubscriptionCreated,
670 EventType::CustomerSubscriptionUpdated,
671 EventType::CustomerSubscriptionPaused,
672 EventType::CustomerSubscriptionResumed,
673 EventType::CustomerSubscriptionDeleted,
674 ]
675 .into_iter()
676 .map(event_type_to_string)
677 .collect::<Vec<_>>();
678
679 let mut pages_of_already_processed_events = 0;
680 let mut unprocessed_events = Vec::new();
681
682 log::info!(
683 "Stripe events: starting retrieval for {}",
684 event_types.join(", ")
685 );
686 let mut params = ListEvents::new();
687 params.types = Some(event_types.clone());
688 params.limit = Some(EVENTS_LIMIT_PER_PAGE);
689
690 let mut event_pages = stripe::Event::list(&stripe_client, ¶ms)
691 .await?
692 .paginate(params);
693
694 loop {
695 let processed_event_ids = {
696 let event_ids = event_pages
697 .page
698 .data
699 .iter()
700 .map(|event| event.id.as_str())
701 .collect::<Vec<_>>();
702 app.db
703 .get_processed_stripe_events_by_event_ids(&event_ids)
704 .await?
705 .into_iter()
706 .map(|event| event.stripe_event_id)
707 .collect::<Vec<_>>()
708 };
709
710 let mut processed_events_in_page = 0;
711 let events_in_page = event_pages.page.data.len();
712 for event in &event_pages.page.data {
713 if processed_event_ids.contains(&event.id.to_string()) {
714 processed_events_in_page += 1;
715 log::debug!("Stripe events: already processed '{}', skipping", event.id);
716 } else {
717 unprocessed_events.push(event.clone());
718 }
719 }
720
721 if processed_events_in_page == events_in_page {
722 pages_of_already_processed_events += 1;
723 }
724
725 if event_pages.page.has_more {
726 if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP
727 {
728 log::info!(
729 "Stripe events: stopping, saw {pages_of_already_processed_events} pages of already-processed events"
730 );
731 break;
732 } else {
733 log::info!("Stripe events: retrieving next page");
734 event_pages = event_pages.next(&stripe_client).await?;
735 }
736 } else {
737 break;
738 }
739 }
740
741 log::info!("Stripe events: unprocessed {}", unprocessed_events.len());
742
743 // Sort all of the unprocessed events in ascending order, so we can handle them in the order they occurred.
744 unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id)));
745
746 for event in unprocessed_events {
747 let event_id = event.id.clone();
748 let processed_event_params = CreateProcessedStripeEventParams {
749 stripe_event_id: event.id.to_string(),
750 stripe_event_type: event_type_to_string(event.type_),
751 stripe_event_created_timestamp: event.created,
752 };
753
754 // If the event has happened too far in the past, we don't want to
755 // process it and risk overwriting other more-recent updates.
756 //
757 // 1 day was chosen arbitrarily. This could be made longer or shorter.
758 let one_day = Duration::from_secs(24 * 60 * 60);
759 let a_day_ago = Utc::now() - one_day;
760 if a_day_ago.timestamp() > event.created {
761 log::info!(
762 "Stripe events: event '{}' is more than {one_day:?} old, marking as processed",
763 event_id
764 );
765 app.db
766 .create_processed_stripe_event(&processed_event_params)
767 .await?;
768
769 return Ok(());
770 }
771
772 let process_result = match event.type_ {
773 EventType::CustomerCreated | EventType::CustomerUpdated => {
774 handle_customer_event(app, stripe_client, event).await
775 }
776 EventType::CustomerSubscriptionCreated
777 | EventType::CustomerSubscriptionUpdated
778 | EventType::CustomerSubscriptionPaused
779 | EventType::CustomerSubscriptionResumed
780 | EventType::CustomerSubscriptionDeleted => {
781 handle_customer_subscription_event(app, rpc_server, stripe_client, event).await
782 }
783 _ => Ok(()),
784 };
785
786 if let Some(()) = process_result
787 .with_context(|| format!("failed to process event {event_id} successfully"))
788 .log_err()
789 {
790 app.db
791 .create_processed_stripe_event(&processed_event_params)
792 .await?;
793 }
794 }
795
796 Ok(())
797}
798
799async fn handle_customer_event(
800 app: &Arc<AppState>,
801 _stripe_client: &stripe::Client,
802 event: stripe::Event,
803) -> anyhow::Result<()> {
804 let EventObject::Customer(customer) = event.data.object else {
805 bail!("unexpected event payload for {}", event.id);
806 };
807
808 log::info!("handling Stripe {} event: {}", event.type_, event.id);
809
810 let Some(email) = customer.email else {
811 log::info!("Stripe customer has no email: skipping");
812 return Ok(());
813 };
814
815 let Some(user) = app.db.get_user_by_email(&email).await? else {
816 log::info!("no user found for email: skipping");
817 return Ok(());
818 };
819
820 if let Some(existing_customer) = app
821 .db
822 .get_billing_customer_by_stripe_customer_id(&customer.id)
823 .await?
824 {
825 app.db
826 .update_billing_customer(
827 existing_customer.id,
828 &UpdateBillingCustomerParams {
829 // For now we just leave the information as-is, as it is not
830 // likely to change.
831 ..Default::default()
832 },
833 )
834 .await?;
835 } else {
836 app.db
837 .create_billing_customer(&CreateBillingCustomerParams {
838 user_id: user.id,
839 stripe_customer_id: customer.id.to_string(),
840 })
841 .await?;
842 }
843
844 Ok(())
845}
846
847async fn handle_customer_subscription_event(
848 app: &Arc<AppState>,
849 rpc_server: &Arc<Server>,
850 stripe_client: &stripe::Client,
851 event: stripe::Event,
852) -> anyhow::Result<()> {
853 let EventObject::Subscription(subscription) = event.data.object else {
854 bail!("unexpected event payload for {}", event.id);
855 };
856
857 log::info!("handling Stripe {} event: {}", event.type_, event.id);
858
859 let subscription_kind = maybe!({
860 let zed_pro_price_id = app.config.zed_pro_price_id().ok()?;
861 let zed_free_price_id = app.config.zed_free_price_id().ok()?;
862
863 subscription.items.data.iter().find_map(|item| {
864 let price = item.price.as_ref()?;
865
866 if price.id == zed_pro_price_id {
867 Some(if subscription.status == SubscriptionStatus::Trialing {
868 SubscriptionKind::ZedProTrial
869 } else {
870 SubscriptionKind::ZedPro
871 })
872 } else if price.id == zed_free_price_id {
873 Some(SubscriptionKind::ZedFree)
874 } else {
875 None
876 }
877 })
878 });
879
880 let billing_customer =
881 find_or_create_billing_customer(app, stripe_client, subscription.customer)
882 .await?
883 .ok_or_else(|| anyhow!("billing customer not found"))?;
884
885 if let Some(SubscriptionKind::ZedProTrial) = subscription_kind {
886 if subscription.status == SubscriptionStatus::Trialing {
887 let current_period_start =
888 DateTime::from_timestamp(subscription.current_period_start, 0)
889 .ok_or_else(|| anyhow!("No trial subscription period start"))?;
890
891 app.db
892 .update_billing_customer(
893 billing_customer.id,
894 &UpdateBillingCustomerParams {
895 trial_started_at: ActiveValue::set(Some(current_period_start.naive_utc())),
896 ..Default::default()
897 },
898 )
899 .await?;
900 }
901 }
902
903 let was_canceled_due_to_payment_failure = subscription.status == SubscriptionStatus::Canceled
904 && subscription
905 .cancellation_details
906 .as_ref()
907 .and_then(|details| details.reason)
908 .map_or(false, |reason| {
909 reason == CancellationDetailsReason::PaymentFailed
910 });
911
912 if was_canceled_due_to_payment_failure {
913 app.db
914 .update_billing_customer(
915 billing_customer.id,
916 &UpdateBillingCustomerParams {
917 has_overdue_invoices: ActiveValue::set(true),
918 ..Default::default()
919 },
920 )
921 .await?;
922 }
923
924 if let Some(existing_subscription) = app
925 .db
926 .get_billing_subscription_by_stripe_subscription_id(&subscription.id)
927 .await?
928 {
929 let llm_db = app
930 .llm_db
931 .clone()
932 .ok_or_else(|| anyhow!("LLM DB not initialized"))?;
933
934 let new_period_start_at =
935 chrono::DateTime::from_timestamp(subscription.current_period_start, 0)
936 .ok_or_else(|| anyhow!("No subscription period start"))?;
937 let new_period_end_at =
938 chrono::DateTime::from_timestamp(subscription.current_period_end, 0)
939 .ok_or_else(|| anyhow!("No subscription period end"))?;
940
941 llm_db
942 .transfer_existing_subscription_usage(
943 billing_customer.user_id,
944 &existing_subscription,
945 subscription_kind,
946 new_period_start_at,
947 new_period_end_at,
948 )
949 .await?;
950
951 app.db
952 .update_billing_subscription(
953 existing_subscription.id,
954 &UpdateBillingSubscriptionParams {
955 billing_customer_id: ActiveValue::set(billing_customer.id),
956 kind: ActiveValue::set(subscription_kind),
957 stripe_subscription_id: ActiveValue::set(subscription.id.to_string()),
958 stripe_subscription_status: ActiveValue::set(subscription.status.into()),
959 stripe_cancel_at: ActiveValue::set(
960 subscription
961 .cancel_at
962 .and_then(|cancel_at| DateTime::from_timestamp(cancel_at, 0))
963 .map(|time| time.naive_utc()),
964 ),
965 stripe_cancellation_reason: ActiveValue::set(
966 subscription
967 .cancellation_details
968 .and_then(|details| details.reason)
969 .map(|reason| reason.into()),
970 ),
971 stripe_current_period_start: ActiveValue::set(Some(
972 subscription.current_period_start,
973 )),
974 stripe_current_period_end: ActiveValue::set(Some(
975 subscription.current_period_end,
976 )),
977 },
978 )
979 .await?;
980 } else {
981 // If the user already has an active billing subscription, ignore the
982 // event and return an `Ok` to signal that it was processed
983 // successfully.
984 //
985 // There is the possibility that this could cause us to not create a
986 // subscription in the following scenario:
987 //
988 // 1. User has an active subscription A
989 // 2. User cancels subscription A
990 // 3. User creates a new subscription B
991 // 4. We process the new subscription B before the cancellation of subscription A
992 // 5. User ends up with no subscriptions
993 //
994 // In theory this situation shouldn't arise as we try to process the events in the order they occur.
995 if app
996 .db
997 .has_active_billing_subscription(billing_customer.user_id)
998 .await?
999 {
1000 log::info!(
1001 "user {user_id} already has an active subscription, skipping creation of subscription {subscription_id}",
1002 user_id = billing_customer.user_id,
1003 subscription_id = subscription.id
1004 );
1005 return Ok(());
1006 }
1007
1008 app.db
1009 .create_billing_subscription(&CreateBillingSubscriptionParams {
1010 billing_customer_id: billing_customer.id,
1011 kind: subscription_kind,
1012 stripe_subscription_id: subscription.id.to_string(),
1013 stripe_subscription_status: subscription.status.into(),
1014 stripe_cancellation_reason: subscription
1015 .cancellation_details
1016 .and_then(|details| details.reason)
1017 .map(|reason| reason.into()),
1018 stripe_current_period_start: Some(subscription.current_period_start),
1019 stripe_current_period_end: Some(subscription.current_period_end),
1020 })
1021 .await?;
1022 }
1023
1024 // When the user's subscription changes, we want to refresh their LLM tokens
1025 // to either grant/revoke access.
1026 rpc_server
1027 .refresh_llm_tokens_for_user(billing_customer.user_id)
1028 .await;
1029
1030 Ok(())
1031}
1032
1033#[derive(Debug, Deserialize)]
1034struct GetMonthlySpendParams {
1035 github_user_id: i32,
1036}
1037
1038#[derive(Debug, Serialize)]
1039struct GetMonthlySpendResponse {
1040 monthly_free_tier_spend_in_cents: u32,
1041 monthly_free_tier_allowance_in_cents: u32,
1042 monthly_spend_in_cents: u32,
1043}
1044
1045async fn get_monthly_spend(
1046 Extension(app): Extension<Arc<AppState>>,
1047 Query(params): Query<GetMonthlySpendParams>,
1048) -> Result<Json<GetMonthlySpendResponse>> {
1049 let user = app
1050 .db
1051 .get_user_by_github_user_id(params.github_user_id)
1052 .await?
1053 .ok_or_else(|| anyhow!("user not found"))?;
1054
1055 let Some(llm_db) = app.llm_db.clone() else {
1056 return Err(Error::http(
1057 StatusCode::NOT_IMPLEMENTED,
1058 "LLM database not available".into(),
1059 ));
1060 };
1061
1062 let free_tier = user
1063 .custom_llm_monthly_allowance_in_cents
1064 .map(|allowance| Cents(allowance as u32))
1065 .unwrap_or(FREE_TIER_MONTHLY_SPENDING_LIMIT);
1066
1067 let spending_for_month = llm_db
1068 .get_user_spending_for_month(user.id, Utc::now())
1069 .await?;
1070
1071 let free_tier_spend = Cents::min(spending_for_month, free_tier);
1072 let monthly_spend = spending_for_month.saturating_sub(free_tier);
1073
1074 Ok(Json(GetMonthlySpendResponse {
1075 monthly_free_tier_spend_in_cents: free_tier_spend.0,
1076 monthly_free_tier_allowance_in_cents: free_tier.0,
1077 monthly_spend_in_cents: monthly_spend.0,
1078 }))
1079}
1080
1081#[derive(Debug, Deserialize)]
1082struct GetCurrentUsageParams {
1083 github_user_id: i32,
1084}
1085
1086#[derive(Debug, Serialize)]
1087struct UsageCounts {
1088 pub used: i32,
1089 pub limit: Option<i32>,
1090 pub remaining: Option<i32>,
1091}
1092
1093#[derive(Debug, Serialize)]
1094struct GetCurrentUsageResponse {
1095 pub model_requests: UsageCounts,
1096 pub edit_predictions: UsageCounts,
1097}
1098
1099async fn get_current_usage(
1100 Extension(app): Extension<Arc<AppState>>,
1101 Query(params): Query<GetCurrentUsageParams>,
1102) -> Result<Json<GetCurrentUsageResponse>> {
1103 let user = app
1104 .db
1105 .get_user_by_github_user_id(params.github_user_id)
1106 .await?
1107 .ok_or_else(|| anyhow!("user not found"))?;
1108
1109 let Some(llm_db) = app.llm_db.clone() else {
1110 return Err(Error::http(
1111 StatusCode::NOT_IMPLEMENTED,
1112 "LLM database not available".into(),
1113 ));
1114 };
1115
1116 let empty_usage = GetCurrentUsageResponse {
1117 model_requests: UsageCounts {
1118 used: 0,
1119 limit: Some(0),
1120 remaining: Some(0),
1121 },
1122 edit_predictions: UsageCounts {
1123 used: 0,
1124 limit: Some(0),
1125 remaining: Some(0),
1126 },
1127 };
1128
1129 let Some(subscription) = app.db.get_active_billing_subscription(user.id).await? else {
1130 return Ok(Json(empty_usage));
1131 };
1132
1133 let subscription_period = maybe!({
1134 let period_start_at = subscription.current_period_start_at()?;
1135 let period_end_at = subscription.current_period_end_at()?;
1136
1137 Some((period_start_at, period_end_at))
1138 });
1139
1140 let Some((period_start_at, period_end_at)) = subscription_period else {
1141 return Ok(Json(empty_usage));
1142 };
1143
1144 let usage = llm_db
1145 .get_subscription_usage_for_period(user.id, period_start_at, period_end_at)
1146 .await?;
1147 let Some(usage) = usage else {
1148 return Ok(Json(empty_usage));
1149 };
1150
1151 let plan = match usage.plan {
1152 SubscriptionKind::ZedPro => zed_llm_client::Plan::ZedPro,
1153 SubscriptionKind::ZedProTrial => zed_llm_client::Plan::ZedProTrial,
1154 SubscriptionKind::ZedFree => zed_llm_client::Plan::Free,
1155 };
1156
1157 let model_requests_limit = match plan.model_requests_limit() {
1158 zed_llm_client::UsageLimit::Limited(limit) => Some(limit),
1159 zed_llm_client::UsageLimit::Unlimited => None,
1160 };
1161 let edit_prediction_limit = match plan.edit_predictions_limit() {
1162 zed_llm_client::UsageLimit::Limited(limit) => Some(limit),
1163 zed_llm_client::UsageLimit::Unlimited => None,
1164 };
1165
1166 Ok(Json(GetCurrentUsageResponse {
1167 model_requests: UsageCounts {
1168 used: usage.model_requests,
1169 limit: model_requests_limit,
1170 remaining: model_requests_limit.map(|limit| (limit - usage.model_requests).max(0)),
1171 },
1172 edit_predictions: UsageCounts {
1173 used: usage.edit_predictions,
1174 limit: edit_prediction_limit,
1175 remaining: edit_prediction_limit.map(|limit| (limit - usage.edit_predictions).max(0)),
1176 },
1177 }))
1178}
1179
1180impl From<SubscriptionStatus> for StripeSubscriptionStatus {
1181 fn from(value: SubscriptionStatus) -> Self {
1182 match value {
1183 SubscriptionStatus::Incomplete => Self::Incomplete,
1184 SubscriptionStatus::IncompleteExpired => Self::IncompleteExpired,
1185 SubscriptionStatus::Trialing => Self::Trialing,
1186 SubscriptionStatus::Active => Self::Active,
1187 SubscriptionStatus::PastDue => Self::PastDue,
1188 SubscriptionStatus::Canceled => Self::Canceled,
1189 SubscriptionStatus::Unpaid => Self::Unpaid,
1190 SubscriptionStatus::Paused => Self::Paused,
1191 }
1192 }
1193}
1194
1195impl From<CancellationDetailsReason> for StripeCancellationReason {
1196 fn from(value: CancellationDetailsReason) -> Self {
1197 match value {
1198 CancellationDetailsReason::CancellationRequested => Self::CancellationRequested,
1199 CancellationDetailsReason::PaymentDisputed => Self::PaymentDisputed,
1200 CancellationDetailsReason::PaymentFailed => Self::PaymentFailed,
1201 }
1202 }
1203}
1204
1205/// Finds or creates a billing customer using the provided customer.
1206async fn find_or_create_billing_customer(
1207 app: &Arc<AppState>,
1208 stripe_client: &stripe::Client,
1209 customer_or_id: Expandable<Customer>,
1210) -> anyhow::Result<Option<billing_customer::Model>> {
1211 let customer_id = match &customer_or_id {
1212 Expandable::Id(id) => id,
1213 Expandable::Object(customer) => customer.id.as_ref(),
1214 };
1215
1216 // If we already have a billing customer record associated with the Stripe customer,
1217 // there's nothing more we need to do.
1218 if let Some(billing_customer) = app
1219 .db
1220 .get_billing_customer_by_stripe_customer_id(customer_id)
1221 .await?
1222 {
1223 return Ok(Some(billing_customer));
1224 }
1225
1226 // If all we have is a customer ID, resolve it to a full customer record by
1227 // hitting the Stripe API.
1228 let customer = match customer_or_id {
1229 Expandable::Id(id) => Customer::retrieve(stripe_client, &id, &[]).await?,
1230 Expandable::Object(customer) => *customer,
1231 };
1232
1233 let Some(email) = customer.email else {
1234 return Ok(None);
1235 };
1236
1237 let Some(user) = app.db.get_user_by_email(&email).await? else {
1238 return Ok(None);
1239 };
1240
1241 let billing_customer = app
1242 .db
1243 .create_billing_customer(&CreateBillingCustomerParams {
1244 user_id: user.id,
1245 stripe_customer_id: customer.id.to_string(),
1246 })
1247 .await?;
1248
1249 Ok(Some(billing_customer))
1250}
1251
1252const SYNC_LLM_TOKEN_USAGE_WITH_STRIPE_INTERVAL: Duration = Duration::from_secs(60);
1253
1254pub fn sync_llm_token_usage_with_stripe_periodically(app: Arc<AppState>) {
1255 let Some(stripe_billing) = app.stripe_billing.clone() else {
1256 log::warn!("failed to retrieve Stripe billing object");
1257 return;
1258 };
1259 let Some(llm_db) = app.llm_db.clone() else {
1260 log::warn!("failed to retrieve LLM database");
1261 return;
1262 };
1263
1264 let executor = app.executor.clone();
1265 executor.spawn_detached({
1266 let executor = executor.clone();
1267 async move {
1268 loop {
1269 sync_token_usage_with_stripe(&app, &llm_db, &stripe_billing)
1270 .await
1271 .context("failed to sync LLM usage to Stripe")
1272 .trace_err();
1273 executor
1274 .sleep(SYNC_LLM_TOKEN_USAGE_WITH_STRIPE_INTERVAL)
1275 .await;
1276 }
1277 }
1278 });
1279}
1280
1281async fn sync_token_usage_with_stripe(
1282 app: &Arc<AppState>,
1283 llm_db: &Arc<LlmDatabase>,
1284 stripe_billing: &Arc<StripeBilling>,
1285) -> anyhow::Result<()> {
1286 let events = llm_db.get_billing_events().await?;
1287 let user_ids = events
1288 .iter()
1289 .map(|(event, _)| event.user_id)
1290 .collect::<HashSet<UserId>>();
1291 let stripe_subscriptions = app.db.get_active_billing_subscriptions(user_ids).await?;
1292
1293 for (event, model) in events {
1294 let Some((stripe_db_customer, stripe_db_subscription)) =
1295 stripe_subscriptions.get(&event.user_id)
1296 else {
1297 tracing::warn!(
1298 user_id = event.user_id.0,
1299 "Registered billing event for user who is not a Stripe customer. Billing events should only be created for users who are Stripe customers, so this is a mistake on our side."
1300 );
1301 continue;
1302 };
1303 let stripe_subscription_id: stripe::SubscriptionId = stripe_db_subscription
1304 .stripe_subscription_id
1305 .parse()
1306 .context("failed to parse stripe subscription id from db")?;
1307 let stripe_customer_id: stripe::CustomerId = stripe_db_customer
1308 .stripe_customer_id
1309 .parse()
1310 .context("failed to parse stripe customer id from db")?;
1311
1312 let stripe_model = stripe_billing
1313 .register_model_for_token_based_usage(&model)
1314 .await?;
1315 stripe_billing
1316 .subscribe_to_model(&stripe_subscription_id, &stripe_model)
1317 .await?;
1318 stripe_billing
1319 .bill_model_token_usage(&stripe_customer_id, &stripe_model, &event)
1320 .await?;
1321 llm_db.consume_billing_event(event.id).await?;
1322 }
1323
1324 Ok(())
1325}
1326
1327const SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL: Duration = Duration::from_secs(60);
1328
1329pub fn sync_llm_request_usage_with_stripe_periodically(app: Arc<AppState>) {
1330 let Some(stripe_billing) = app.stripe_billing.clone() else {
1331 log::warn!("failed to retrieve Stripe billing object");
1332 return;
1333 };
1334 let Some(llm_db) = app.llm_db.clone() else {
1335 log::warn!("failed to retrieve LLM database");
1336 return;
1337 };
1338
1339 let executor = app.executor.clone();
1340 executor.spawn_detached({
1341 let executor = executor.clone();
1342 async move {
1343 loop {
1344 sync_model_request_usage_with_stripe(&app, &llm_db, &stripe_billing)
1345 .await
1346 .context("failed to sync LLM request usage to Stripe")
1347 .trace_err();
1348 executor
1349 .sleep(SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL)
1350 .await;
1351 }
1352 }
1353 });
1354}
1355
1356async fn sync_model_request_usage_with_stripe(
1357 app: &Arc<AppState>,
1358 llm_db: &Arc<LlmDatabase>,
1359 stripe_billing: &Arc<StripeBilling>,
1360) -> anyhow::Result<()> {
1361 let usage_meters = llm_db
1362 .get_current_subscription_usage_meters(Utc::now())
1363 .await?;
1364 let user_ids = usage_meters
1365 .iter()
1366 .map(|(_, usage)| usage.user_id)
1367 .collect::<HashSet<UserId>>();
1368 let billing_subscriptions = app
1369 .db
1370 .get_active_zed_pro_billing_subscriptions(user_ids)
1371 .await?;
1372
1373 let claude_3_5_sonnet = stripe_billing
1374 .find_price_by_lookup_key("claude-3-5-sonnet-requests")
1375 .await?;
1376 let claude_3_7_sonnet = stripe_billing
1377 .find_price_by_lookup_key("claude-3-7-sonnet-requests")
1378 .await?;
1379 let claude_3_7_sonnet_max = stripe_billing
1380 .find_price_by_lookup_key("claude-3-7-sonnet-requests-max")
1381 .await?;
1382
1383 for (usage_meter, usage) in usage_meters {
1384 maybe!(async {
1385 let Some((billing_customer, billing_subscription)) =
1386 billing_subscriptions.get(&usage.user_id)
1387 else {
1388 bail!(
1389 "Attempted to sync usage meter for user who is not a Stripe customer: {}",
1390 usage.user_id
1391 );
1392 };
1393
1394 let stripe_customer_id = billing_customer
1395 .stripe_customer_id
1396 .parse::<stripe::CustomerId>()
1397 .context("failed to parse Stripe customer ID from database")?;
1398 let stripe_subscription_id = billing_subscription
1399 .stripe_subscription_id
1400 .parse::<stripe::SubscriptionId>()
1401 .context("failed to parse Stripe subscription ID from database")?;
1402
1403 let model = llm_db.model_by_id(usage_meter.model_id)?;
1404
1405 let (price_id, meter_event_name) = match model.name.as_str() {
1406 "claude-3-5-sonnet" => (&claude_3_5_sonnet.id, "claude_3_5_sonnet/requests"),
1407 "claude-3-7-sonnet" => match usage_meter.mode {
1408 CompletionMode::Normal => (&claude_3_7_sonnet.id, "claude_3_7_sonnet/requests"),
1409 CompletionMode::Max => {
1410 (&claude_3_7_sonnet_max.id, "claude_3_7_sonnet/requests/max")
1411 }
1412 },
1413 model_name => {
1414 bail!("Attempted to sync usage meter for unsupported model: {model_name:?}")
1415 }
1416 };
1417
1418 stripe_billing
1419 .subscribe_to_price(&stripe_subscription_id, price_id)
1420 .await?;
1421 stripe_billing
1422 .bill_model_request_usage(
1423 &stripe_customer_id,
1424 meter_event_name,
1425 usage_meter.requests,
1426 )
1427 .await?;
1428
1429 Ok(())
1430 })
1431 .await
1432 .log_err();
1433 }
1434
1435 Ok(())
1436}