1use anyhow::{Context as _, bail};
2use chrono::{DateTime, Utc};
3use collections::{HashMap, HashSet};
4use sea_orm::ActiveValue;
5use std::{sync::Arc, time::Duration};
6use stripe::{CancellationDetailsReason, EventObject, EventType, ListEvents, SubscriptionStatus};
7use util::{ResultExt, maybe};
8use zed_llm_client::LanguageModelProvider;
9
10use crate::AppState;
11use crate::db::billing_subscription::{
12 StripeCancellationReason, StripeSubscriptionStatus, SubscriptionKind,
13};
14use crate::llm::db::subscription_usage_meter::{self, CompletionMode};
15use crate::rpc::{ResultExt as _, Server};
16use crate::stripe_client::{
17 StripeCancellationDetailsReason, StripeClient, StripeCustomerId, StripeSubscription,
18 StripeSubscriptionId,
19};
20use crate::{db::UserId, llm::db::LlmDatabase};
21use crate::{
22 db::{
23 CreateBillingCustomerParams, CreateBillingSubscriptionParams,
24 CreateProcessedStripeEventParams, UpdateBillingCustomerParams,
25 UpdateBillingSubscriptionParams, billing_customer,
26 },
27 stripe_billing::StripeBilling,
28};
29
30/// The amount of time we wait in between each poll of Stripe events.
31///
32/// This value should strike a balance between:
33/// 1. Being short enough that we update quickly when something in Stripe changes
34/// 2. Being long enough that we don't eat into our rate limits.
35///
36/// As a point of reference, the Sequin folks say they have this at **500ms**:
37///
38/// > We poll the Stripe /events endpoint every 500ms per account
39/// >
40/// > — https://blog.sequinstream.com/events-not-webhooks/
41const POLL_EVENTS_INTERVAL: Duration = Duration::from_secs(5);
42
43/// The maximum number of events to return per page.
44///
45/// We set this to 100 (the max) so we have to make fewer requests to Stripe.
46///
47/// > Limit can range between 1 and 100, and the default is 10.
48const EVENTS_LIMIT_PER_PAGE: u64 = 100;
49
50/// The number of pages consisting entirely of already-processed events that we
51/// will see before we stop retrieving events.
52///
53/// This is used to prevent over-fetching the Stripe events API for events we've
54/// already seen and processed.
55const NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP: usize = 4;
56
57/// Polls the Stripe events API periodically to reconcile the records in our
58/// database with the data in Stripe.
59pub fn poll_stripe_events_periodically(app: Arc<AppState>, rpc_server: Arc<Server>) {
60 let Some(real_stripe_client) = app.real_stripe_client.clone() else {
61 log::warn!("failed to retrieve Stripe client");
62 return;
63 };
64 let Some(stripe_client) = app.stripe_client.clone() else {
65 log::warn!("failed to retrieve Stripe client");
66 return;
67 };
68
69 let executor = app.executor.clone();
70 executor.spawn_detached({
71 let executor = executor.clone();
72 async move {
73 loop {
74 poll_stripe_events(&app, &rpc_server, &stripe_client, &real_stripe_client)
75 .await
76 .log_err();
77
78 executor.sleep(POLL_EVENTS_INTERVAL).await;
79 }
80 }
81 });
82}
83
84async fn poll_stripe_events(
85 app: &Arc<AppState>,
86 rpc_server: &Arc<Server>,
87 stripe_client: &Arc<dyn StripeClient>,
88 real_stripe_client: &stripe::Client,
89) -> anyhow::Result<()> {
90 let feature_flags = app.db.list_feature_flags().await?;
91 let sync_events_using_cloud = feature_flags
92 .iter()
93 .any(|flag| flag.flag == "cloud-stripe-events-polling" && flag.enabled_for_all);
94 if sync_events_using_cloud {
95 return Ok(());
96 }
97
98 fn event_type_to_string(event_type: EventType) -> String {
99 // Calling `to_string` on `stripe::EventType` members gives us a quoted string,
100 // so we need to unquote it.
101 event_type.to_string().trim_matches('"').to_string()
102 }
103
104 let event_types = [
105 EventType::CustomerCreated,
106 EventType::CustomerUpdated,
107 EventType::CustomerSubscriptionCreated,
108 EventType::CustomerSubscriptionUpdated,
109 EventType::CustomerSubscriptionPaused,
110 EventType::CustomerSubscriptionResumed,
111 EventType::CustomerSubscriptionDeleted,
112 ]
113 .into_iter()
114 .map(event_type_to_string)
115 .collect::<Vec<_>>();
116
117 let mut pages_of_already_processed_events = 0;
118 let mut unprocessed_events = Vec::new();
119
120 log::info!(
121 "Stripe events: starting retrieval for {}",
122 event_types.join(", ")
123 );
124 let mut params = ListEvents::new();
125 params.types = Some(event_types.clone());
126 params.limit = Some(EVENTS_LIMIT_PER_PAGE);
127
128 let mut event_pages = stripe::Event::list(&real_stripe_client, ¶ms)
129 .await?
130 .paginate(params);
131
132 loop {
133 let processed_event_ids = {
134 let event_ids = event_pages
135 .page
136 .data
137 .iter()
138 .map(|event| event.id.as_str())
139 .collect::<Vec<_>>();
140 app.db
141 .get_processed_stripe_events_by_event_ids(&event_ids)
142 .await?
143 .into_iter()
144 .map(|event| event.stripe_event_id)
145 .collect::<Vec<_>>()
146 };
147
148 let mut processed_events_in_page = 0;
149 let events_in_page = event_pages.page.data.len();
150 for event in &event_pages.page.data {
151 if processed_event_ids.contains(&event.id.to_string()) {
152 processed_events_in_page += 1;
153 log::debug!("Stripe events: already processed '{}', skipping", event.id);
154 } else {
155 unprocessed_events.push(event.clone());
156 }
157 }
158
159 if processed_events_in_page == events_in_page {
160 pages_of_already_processed_events += 1;
161 }
162
163 if event_pages.page.has_more {
164 if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP
165 {
166 log::info!(
167 "Stripe events: stopping, saw {pages_of_already_processed_events} pages of already-processed events"
168 );
169 break;
170 } else {
171 log::info!("Stripe events: retrieving next page");
172 event_pages = event_pages.next(&real_stripe_client).await?;
173 }
174 } else {
175 break;
176 }
177 }
178
179 log::info!("Stripe events: unprocessed {}", unprocessed_events.len());
180
181 // Sort all of the unprocessed events in ascending order, so we can handle them in the order they occurred.
182 unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id)));
183
184 for event in unprocessed_events {
185 let event_id = event.id.clone();
186 let processed_event_params = CreateProcessedStripeEventParams {
187 stripe_event_id: event.id.to_string(),
188 stripe_event_type: event_type_to_string(event.type_),
189 stripe_event_created_timestamp: event.created,
190 };
191
192 // If the event has happened too far in the past, we don't want to
193 // process it and risk overwriting other more-recent updates.
194 //
195 // 1 day was chosen arbitrarily. This could be made longer or shorter.
196 let one_day = Duration::from_secs(24 * 60 * 60);
197 let a_day_ago = Utc::now() - one_day;
198 if a_day_ago.timestamp() > event.created {
199 log::info!(
200 "Stripe events: event '{}' is more than {one_day:?} old, marking as processed",
201 event_id
202 );
203 app.db
204 .create_processed_stripe_event(&processed_event_params)
205 .await?;
206
207 continue;
208 }
209
210 let process_result = match event.type_ {
211 EventType::CustomerCreated | EventType::CustomerUpdated => {
212 handle_customer_event(app, real_stripe_client, event).await
213 }
214 EventType::CustomerSubscriptionCreated
215 | EventType::CustomerSubscriptionUpdated
216 | EventType::CustomerSubscriptionPaused
217 | EventType::CustomerSubscriptionResumed
218 | EventType::CustomerSubscriptionDeleted => {
219 handle_customer_subscription_event(app, rpc_server, stripe_client, event).await
220 }
221 _ => Ok(()),
222 };
223
224 if let Some(()) = process_result
225 .with_context(|| format!("failed to process event {event_id} successfully"))
226 .log_err()
227 {
228 app.db
229 .create_processed_stripe_event(&processed_event_params)
230 .await?;
231 }
232 }
233
234 Ok(())
235}
236
237async fn handle_customer_event(
238 app: &Arc<AppState>,
239 _stripe_client: &stripe::Client,
240 event: stripe::Event,
241) -> anyhow::Result<()> {
242 let EventObject::Customer(customer) = event.data.object else {
243 bail!("unexpected event payload for {}", event.id);
244 };
245
246 log::info!("handling Stripe {} event: {}", event.type_, event.id);
247
248 let Some(email) = customer.email else {
249 log::info!("Stripe customer has no email: skipping");
250 return Ok(());
251 };
252
253 let Some(user) = app.db.get_user_by_email(&email).await? else {
254 log::info!("no user found for email: skipping");
255 return Ok(());
256 };
257
258 if let Some(existing_customer) = app
259 .db
260 .get_billing_customer_by_stripe_customer_id(&customer.id)
261 .await?
262 {
263 app.db
264 .update_billing_customer(
265 existing_customer.id,
266 &UpdateBillingCustomerParams {
267 // For now we just leave the information as-is, as it is not
268 // likely to change.
269 ..Default::default()
270 },
271 )
272 .await?;
273 } else {
274 app.db
275 .create_billing_customer(&CreateBillingCustomerParams {
276 user_id: user.id,
277 stripe_customer_id: customer.id.to_string(),
278 })
279 .await?;
280 }
281
282 Ok(())
283}
284
285async fn sync_subscription(
286 app: &Arc<AppState>,
287 stripe_client: &Arc<dyn StripeClient>,
288 subscription: StripeSubscription,
289) -> anyhow::Result<billing_customer::Model> {
290 let subscription_kind = if let Some(stripe_billing) = &app.stripe_billing {
291 stripe_billing
292 .determine_subscription_kind(&subscription)
293 .await
294 } else {
295 None
296 };
297
298 let billing_customer =
299 find_or_create_billing_customer(app, stripe_client.as_ref(), &subscription.customer)
300 .await?
301 .context("billing customer not found")?;
302
303 if let Some(SubscriptionKind::ZedProTrial) = subscription_kind {
304 if subscription.status == SubscriptionStatus::Trialing {
305 let current_period_start =
306 DateTime::from_timestamp(subscription.current_period_start, 0)
307 .context("No trial subscription period start")?;
308
309 app.db
310 .update_billing_customer(
311 billing_customer.id,
312 &UpdateBillingCustomerParams {
313 trial_started_at: ActiveValue::set(Some(current_period_start.naive_utc())),
314 ..Default::default()
315 },
316 )
317 .await?;
318 }
319 }
320
321 let was_canceled_due_to_payment_failure = subscription.status == SubscriptionStatus::Canceled
322 && subscription
323 .cancellation_details
324 .as_ref()
325 .and_then(|details| details.reason)
326 .map_or(false, |reason| {
327 reason == StripeCancellationDetailsReason::PaymentFailed
328 });
329
330 if was_canceled_due_to_payment_failure {
331 app.db
332 .update_billing_customer(
333 billing_customer.id,
334 &UpdateBillingCustomerParams {
335 has_overdue_invoices: ActiveValue::set(true),
336 ..Default::default()
337 },
338 )
339 .await?;
340 }
341
342 if let Some(existing_subscription) = app
343 .db
344 .get_billing_subscription_by_stripe_subscription_id(subscription.id.0.as_ref())
345 .await?
346 {
347 app.db
348 .update_billing_subscription(
349 existing_subscription.id,
350 &UpdateBillingSubscriptionParams {
351 billing_customer_id: ActiveValue::set(billing_customer.id),
352 kind: ActiveValue::set(subscription_kind),
353 stripe_subscription_id: ActiveValue::set(subscription.id.to_string()),
354 stripe_subscription_status: ActiveValue::set(subscription.status.into()),
355 stripe_cancel_at: ActiveValue::set(
356 subscription
357 .cancel_at
358 .and_then(|cancel_at| DateTime::from_timestamp(cancel_at, 0))
359 .map(|time| time.naive_utc()),
360 ),
361 stripe_cancellation_reason: ActiveValue::set(
362 subscription
363 .cancellation_details
364 .and_then(|details| details.reason)
365 .map(|reason| reason.into()),
366 ),
367 stripe_current_period_start: ActiveValue::set(Some(
368 subscription.current_period_start,
369 )),
370 stripe_current_period_end: ActiveValue::set(Some(
371 subscription.current_period_end,
372 )),
373 },
374 )
375 .await?;
376 } else {
377 if let Some(existing_subscription) = app
378 .db
379 .get_active_billing_subscription(billing_customer.user_id)
380 .await?
381 {
382 if existing_subscription.kind == Some(SubscriptionKind::ZedFree)
383 && subscription_kind == Some(SubscriptionKind::ZedProTrial)
384 {
385 let stripe_subscription_id = StripeSubscriptionId(
386 existing_subscription.stripe_subscription_id.clone().into(),
387 );
388
389 stripe_client
390 .cancel_subscription(&stripe_subscription_id)
391 .await?;
392 } else {
393 // If the user already has an active billing subscription, ignore the
394 // event and return an `Ok` to signal that it was processed
395 // successfully.
396 //
397 // There is the possibility that this could cause us to not create a
398 // subscription in the following scenario:
399 //
400 // 1. User has an active subscription A
401 // 2. User cancels subscription A
402 // 3. User creates a new subscription B
403 // 4. We process the new subscription B before the cancellation of subscription A
404 // 5. User ends up with no subscriptions
405 //
406 // In theory this situation shouldn't arise as we try to process the events in the order they occur.
407
408 log::info!(
409 "user {user_id} already has an active subscription, skipping creation of subscription {subscription_id}",
410 user_id = billing_customer.user_id,
411 subscription_id = subscription.id
412 );
413 return Ok(billing_customer);
414 }
415 }
416
417 app.db
418 .create_billing_subscription(&CreateBillingSubscriptionParams {
419 billing_customer_id: billing_customer.id,
420 kind: subscription_kind,
421 stripe_subscription_id: subscription.id.to_string(),
422 stripe_subscription_status: subscription.status.into(),
423 stripe_cancellation_reason: subscription
424 .cancellation_details
425 .and_then(|details| details.reason)
426 .map(|reason| reason.into()),
427 stripe_current_period_start: Some(subscription.current_period_start),
428 stripe_current_period_end: Some(subscription.current_period_end),
429 })
430 .await?;
431 }
432
433 if let Some(stripe_billing) = app.stripe_billing.as_ref() {
434 if subscription.status == SubscriptionStatus::Canceled
435 || subscription.status == SubscriptionStatus::Paused
436 {
437 let already_has_active_billing_subscription = app
438 .db
439 .has_active_billing_subscription(billing_customer.user_id)
440 .await?;
441 if !already_has_active_billing_subscription {
442 let stripe_customer_id =
443 StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
444
445 stripe_billing
446 .subscribe_to_zed_free(stripe_customer_id)
447 .await?;
448 }
449 }
450 }
451
452 Ok(billing_customer)
453}
454
455async fn handle_customer_subscription_event(
456 app: &Arc<AppState>,
457 rpc_server: &Arc<Server>,
458 stripe_client: &Arc<dyn StripeClient>,
459 event: stripe::Event,
460) -> anyhow::Result<()> {
461 let EventObject::Subscription(subscription) = event.data.object else {
462 bail!("unexpected event payload for {}", event.id);
463 };
464
465 log::info!("handling Stripe {} event: {}", event.type_, event.id);
466
467 let billing_customer = sync_subscription(app, stripe_client, subscription.into()).await?;
468
469 // When the user's subscription changes, push down any changes to their plan.
470 rpc_server
471 .update_plan_for_user_legacy(billing_customer.user_id)
472 .await
473 .trace_err();
474
475 // When the user's subscription changes, we want to refresh their LLM tokens
476 // to either grant/revoke access.
477 rpc_server
478 .refresh_llm_tokens_for_user(billing_customer.user_id)
479 .await;
480
481 Ok(())
482}
483
484impl From<SubscriptionStatus> for StripeSubscriptionStatus {
485 fn from(value: SubscriptionStatus) -> Self {
486 match value {
487 SubscriptionStatus::Incomplete => Self::Incomplete,
488 SubscriptionStatus::IncompleteExpired => Self::IncompleteExpired,
489 SubscriptionStatus::Trialing => Self::Trialing,
490 SubscriptionStatus::Active => Self::Active,
491 SubscriptionStatus::PastDue => Self::PastDue,
492 SubscriptionStatus::Canceled => Self::Canceled,
493 SubscriptionStatus::Unpaid => Self::Unpaid,
494 SubscriptionStatus::Paused => Self::Paused,
495 }
496 }
497}
498
499impl From<CancellationDetailsReason> for StripeCancellationReason {
500 fn from(value: CancellationDetailsReason) -> Self {
501 match value {
502 CancellationDetailsReason::CancellationRequested => Self::CancellationRequested,
503 CancellationDetailsReason::PaymentDisputed => Self::PaymentDisputed,
504 CancellationDetailsReason::PaymentFailed => Self::PaymentFailed,
505 }
506 }
507}
508
509/// Finds or creates a billing customer using the provided customer.
510pub async fn find_or_create_billing_customer(
511 app: &Arc<AppState>,
512 stripe_client: &dyn StripeClient,
513 customer_id: &StripeCustomerId,
514) -> anyhow::Result<Option<billing_customer::Model>> {
515 // If we already have a billing customer record associated with the Stripe customer,
516 // there's nothing more we need to do.
517 if let Some(billing_customer) = app
518 .db
519 .get_billing_customer_by_stripe_customer_id(customer_id.0.as_ref())
520 .await?
521 {
522 return Ok(Some(billing_customer));
523 }
524
525 let customer = stripe_client.get_customer(customer_id).await?;
526
527 let Some(email) = customer.email else {
528 return Ok(None);
529 };
530
531 let Some(user) = app.db.get_user_by_email(&email).await? else {
532 return Ok(None);
533 };
534
535 let billing_customer = app
536 .db
537 .create_billing_customer(&CreateBillingCustomerParams {
538 user_id: user.id,
539 stripe_customer_id: customer.id.to_string(),
540 })
541 .await?;
542
543 Ok(Some(billing_customer))
544}
545
546const SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL: Duration = Duration::from_secs(60);
547
548pub fn sync_llm_request_usage_with_stripe_periodically(app: Arc<AppState>) {
549 let Some(stripe_billing) = app.stripe_billing.clone() else {
550 log::warn!("failed to retrieve Stripe billing object");
551 return;
552 };
553 let Some(llm_db) = app.llm_db.clone() else {
554 log::warn!("failed to retrieve LLM database");
555 return;
556 };
557
558 let executor = app.executor.clone();
559 executor.spawn_detached({
560 let executor = executor.clone();
561 async move {
562 loop {
563 sync_model_request_usage_with_stripe(&app, &llm_db, &stripe_billing)
564 .await
565 .context("failed to sync LLM request usage to Stripe")
566 .trace_err();
567 executor
568 .sleep(SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL)
569 .await;
570 }
571 }
572 });
573}
574
575async fn sync_model_request_usage_with_stripe(
576 app: &Arc<AppState>,
577 llm_db: &Arc<LlmDatabase>,
578 stripe_billing: &Arc<StripeBilling>,
579) -> anyhow::Result<()> {
580 let feature_flags = app.db.list_feature_flags().await?;
581 let sync_model_request_usage_using_cloud = feature_flags
582 .iter()
583 .any(|flag| flag.flag == "cloud-stripe-usage-meters-sync" && flag.enabled_for_all);
584 if sync_model_request_usage_using_cloud {
585 return Ok(());
586 }
587
588 log::info!("Stripe usage sync: Starting");
589 let started_at = Utc::now();
590
591 let staff_users = app.db.get_staff_users().await?;
592 let staff_user_ids = staff_users
593 .iter()
594 .map(|user| user.id)
595 .collect::<HashSet<UserId>>();
596
597 let usage_meters = llm_db
598 .get_current_subscription_usage_meters(Utc::now())
599 .await?;
600 let mut usage_meters_by_user_id =
601 HashMap::<UserId, Vec<subscription_usage_meter::Model>>::default();
602 for (usage_meter, usage) in usage_meters {
603 let meters = usage_meters_by_user_id.entry(usage.user_id).or_default();
604 meters.push(usage_meter);
605 }
606
607 log::info!("Stripe usage sync: Retrieving Zed Pro subscriptions");
608 let get_zed_pro_subscriptions_started_at = Utc::now();
609 let billing_subscriptions = app.db.get_active_zed_pro_billing_subscriptions().await?;
610 log::info!(
611 "Stripe usage sync: Retrieved {} Zed Pro subscriptions in {}",
612 billing_subscriptions.len(),
613 Utc::now() - get_zed_pro_subscriptions_started_at
614 );
615
616 let claude_sonnet_4 = stripe_billing
617 .find_price_by_lookup_key("claude-sonnet-4-requests")
618 .await?;
619 let claude_sonnet_4_max = stripe_billing
620 .find_price_by_lookup_key("claude-sonnet-4-requests-max")
621 .await?;
622 let claude_opus_4 = stripe_billing
623 .find_price_by_lookup_key("claude-opus-4-requests")
624 .await?;
625 let claude_opus_4_max = stripe_billing
626 .find_price_by_lookup_key("claude-opus-4-requests-max")
627 .await?;
628 let claude_3_5_sonnet = stripe_billing
629 .find_price_by_lookup_key("claude-3-5-sonnet-requests")
630 .await?;
631 let claude_3_7_sonnet = stripe_billing
632 .find_price_by_lookup_key("claude-3-7-sonnet-requests")
633 .await?;
634 let claude_3_7_sonnet_max = stripe_billing
635 .find_price_by_lookup_key("claude-3-7-sonnet-requests-max")
636 .await?;
637
638 let model_mode_combinations = [
639 ("claude-opus-4", CompletionMode::Max),
640 ("claude-opus-4", CompletionMode::Normal),
641 ("claude-sonnet-4", CompletionMode::Max),
642 ("claude-sonnet-4", CompletionMode::Normal),
643 ("claude-3-7-sonnet", CompletionMode::Max),
644 ("claude-3-7-sonnet", CompletionMode::Normal),
645 ("claude-3-5-sonnet", CompletionMode::Normal),
646 ];
647
648 let billing_subscription_count = billing_subscriptions.len();
649
650 log::info!("Stripe usage sync: Syncing {billing_subscription_count} Zed Pro subscriptions");
651
652 for (user_id, (billing_customer, billing_subscription)) in billing_subscriptions {
653 maybe!(async {
654 if staff_user_ids.contains(&user_id) {
655 return anyhow::Ok(());
656 }
657
658 let stripe_customer_id =
659 StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
660 let stripe_subscription_id =
661 StripeSubscriptionId(billing_subscription.stripe_subscription_id.clone().into());
662
663 let usage_meters = usage_meters_by_user_id.get(&user_id);
664
665 for (model, mode) in &model_mode_combinations {
666 let Ok(model) =
667 llm_db.model(LanguageModelProvider::Anthropic, model)
668 else {
669 log::warn!("Failed to load model for user {user_id}: {model}");
670 continue;
671 };
672
673 let (price, meter_event_name) = match model.name.as_str() {
674 "claude-opus-4" => match mode {
675 CompletionMode::Normal => (&claude_opus_4, "claude_opus_4/requests"),
676 CompletionMode::Max => (&claude_opus_4_max, "claude_opus_4/requests/max"),
677 },
678 "claude-sonnet-4" => match mode {
679 CompletionMode::Normal => (&claude_sonnet_4, "claude_sonnet_4/requests"),
680 CompletionMode::Max => {
681 (&claude_sonnet_4_max, "claude_sonnet_4/requests/max")
682 }
683 },
684 "claude-3-5-sonnet" => (&claude_3_5_sonnet, "claude_3_5_sonnet/requests"),
685 "claude-3-7-sonnet" => match mode {
686 CompletionMode::Normal => {
687 (&claude_3_7_sonnet, "claude_3_7_sonnet/requests")
688 }
689 CompletionMode::Max => {
690 (&claude_3_7_sonnet_max, "claude_3_7_sonnet/requests/max")
691 }
692 },
693 model_name => {
694 bail!("Attempted to sync usage meter for unsupported model: {model_name:?}")
695 }
696 };
697
698 let model_requests = usage_meters
699 .and_then(|usage_meters| {
700 usage_meters
701 .iter()
702 .find(|meter| meter.model_id == model.id && meter.mode == *mode)
703 })
704 .map(|usage_meter| usage_meter.requests)
705 .unwrap_or(0);
706
707 if model_requests > 0 {
708 stripe_billing
709 .subscribe_to_price(&stripe_subscription_id, price)
710 .await?;
711 }
712
713 stripe_billing
714 .bill_model_request_usage(&stripe_customer_id, meter_event_name, model_requests)
715 .await
716 .with_context(|| {
717 format!(
718 "Failed to bill model request usage of {model_requests} for {stripe_customer_id}: {meter_event_name}",
719 )
720 })?;
721 }
722
723 Ok(())
724 })
725 .await
726 .log_err();
727 }
728
729 log::info!(
730 "Stripe usage sync: Synced {billing_subscription_count} Zed Pro subscriptions in {}",
731 Utc::now() - started_at
732 );
733
734 Ok(())
735}