1use anyhow::{Context as _, bail};
2use chrono::{DateTime, Utc};
3use collections::{HashMap, HashSet};
4use sea_orm::ActiveValue;
5use std::{sync::Arc, time::Duration};
6use stripe::{CancellationDetailsReason, EventObject, EventType, ListEvents, SubscriptionStatus};
7use util::{ResultExt, maybe};
8use zed_llm_client::LanguageModelProvider;
9
10use crate::AppState;
11use crate::db::billing_subscription::{
12 StripeCancellationReason, StripeSubscriptionStatus, SubscriptionKind,
13};
14use crate::llm::db::subscription_usage_meter::{self, CompletionMode};
15use crate::rpc::{ResultExt as _, Server};
16use crate::stripe_client::{
17 StripeCancellationDetailsReason, StripeClient, StripeCustomerId, StripeSubscription,
18 StripeSubscriptionId,
19};
20use crate::{db::UserId, llm::db::LlmDatabase};
21use crate::{
22 db::{
23 CreateBillingCustomerParams, CreateBillingSubscriptionParams,
24 CreateProcessedStripeEventParams, UpdateBillingCustomerParams,
25 UpdateBillingSubscriptionParams, billing_customer,
26 },
27 stripe_billing::StripeBilling,
28};
29
30/// The amount of time we wait in between each poll of Stripe events.
31///
32/// This value should strike a balance between:
33/// 1. Being short enough that we update quickly when something in Stripe changes
34/// 2. Being long enough that we don't eat into our rate limits.
35///
36/// As a point of reference, the Sequin folks say they have this at **500ms**:
37///
38/// > We poll the Stripe /events endpoint every 500ms per account
39/// >
40/// > — https://blog.sequinstream.com/events-not-webhooks/
41const POLL_EVENTS_INTERVAL: Duration = Duration::from_secs(5);
42
43/// The maximum number of events to return per page.
44///
45/// We set this to 100 (the max) so we have to make fewer requests to Stripe.
46///
47/// > Limit can range between 1 and 100, and the default is 10.
48const EVENTS_LIMIT_PER_PAGE: u64 = 100;
49
50/// The number of pages consisting entirely of already-processed events that we
51/// will see before we stop retrieving events.
52///
53/// This is used to prevent over-fetching the Stripe events API for events we've
54/// already seen and processed.
55const NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP: usize = 4;
56
57/// Polls the Stripe events API periodically to reconcile the records in our
58/// database with the data in Stripe.
59pub fn poll_stripe_events_periodically(app: Arc<AppState>, rpc_server: Arc<Server>) {
60 let Some(real_stripe_client) = app.real_stripe_client.clone() else {
61 log::warn!("failed to retrieve Stripe client");
62 return;
63 };
64 let Some(stripe_client) = app.stripe_client.clone() else {
65 log::warn!("failed to retrieve Stripe client");
66 return;
67 };
68
69 let executor = app.executor.clone();
70 executor.spawn_detached({
71 let executor = executor.clone();
72 async move {
73 loop {
74 poll_stripe_events(&app, &rpc_server, &stripe_client, &real_stripe_client)
75 .await
76 .log_err();
77
78 executor.sleep(POLL_EVENTS_INTERVAL).await;
79 }
80 }
81 });
82}
83
84async fn poll_stripe_events(
85 app: &Arc<AppState>,
86 rpc_server: &Arc<Server>,
87 stripe_client: &Arc<dyn StripeClient>,
88 real_stripe_client: &stripe::Client,
89) -> anyhow::Result<()> {
90 fn event_type_to_string(event_type: EventType) -> String {
91 // Calling `to_string` on `stripe::EventType` members gives us a quoted string,
92 // so we need to unquote it.
93 event_type.to_string().trim_matches('"').to_string()
94 }
95
96 let event_types = [
97 EventType::CustomerCreated,
98 EventType::CustomerUpdated,
99 EventType::CustomerSubscriptionCreated,
100 EventType::CustomerSubscriptionUpdated,
101 EventType::CustomerSubscriptionPaused,
102 EventType::CustomerSubscriptionResumed,
103 EventType::CustomerSubscriptionDeleted,
104 ]
105 .into_iter()
106 .map(event_type_to_string)
107 .collect::<Vec<_>>();
108
109 let mut pages_of_already_processed_events = 0;
110 let mut unprocessed_events = Vec::new();
111
112 log::info!(
113 "Stripe events: starting retrieval for {}",
114 event_types.join(", ")
115 );
116 let mut params = ListEvents::new();
117 params.types = Some(event_types.clone());
118 params.limit = Some(EVENTS_LIMIT_PER_PAGE);
119
120 let mut event_pages = stripe::Event::list(&real_stripe_client, ¶ms)
121 .await?
122 .paginate(params);
123
124 loop {
125 let processed_event_ids = {
126 let event_ids = event_pages
127 .page
128 .data
129 .iter()
130 .map(|event| event.id.as_str())
131 .collect::<Vec<_>>();
132 app.db
133 .get_processed_stripe_events_by_event_ids(&event_ids)
134 .await?
135 .into_iter()
136 .map(|event| event.stripe_event_id)
137 .collect::<Vec<_>>()
138 };
139
140 let mut processed_events_in_page = 0;
141 let events_in_page = event_pages.page.data.len();
142 for event in &event_pages.page.data {
143 if processed_event_ids.contains(&event.id.to_string()) {
144 processed_events_in_page += 1;
145 log::debug!("Stripe events: already processed '{}', skipping", event.id);
146 } else {
147 unprocessed_events.push(event.clone());
148 }
149 }
150
151 if processed_events_in_page == events_in_page {
152 pages_of_already_processed_events += 1;
153 }
154
155 if event_pages.page.has_more {
156 if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP
157 {
158 log::info!(
159 "Stripe events: stopping, saw {pages_of_already_processed_events} pages of already-processed events"
160 );
161 break;
162 } else {
163 log::info!("Stripe events: retrieving next page");
164 event_pages = event_pages.next(&real_stripe_client).await?;
165 }
166 } else {
167 break;
168 }
169 }
170
171 log::info!("Stripe events: unprocessed {}", unprocessed_events.len());
172
173 // Sort all of the unprocessed events in ascending order, so we can handle them in the order they occurred.
174 unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id)));
175
176 for event in unprocessed_events {
177 let event_id = event.id.clone();
178 let processed_event_params = CreateProcessedStripeEventParams {
179 stripe_event_id: event.id.to_string(),
180 stripe_event_type: event_type_to_string(event.type_),
181 stripe_event_created_timestamp: event.created,
182 };
183
184 // If the event has happened too far in the past, we don't want to
185 // process it and risk overwriting other more-recent updates.
186 //
187 // 1 day was chosen arbitrarily. This could be made longer or shorter.
188 let one_day = Duration::from_secs(24 * 60 * 60);
189 let a_day_ago = Utc::now() - one_day;
190 if a_day_ago.timestamp() > event.created {
191 log::info!(
192 "Stripe events: event '{}' is more than {one_day:?} old, marking as processed",
193 event_id
194 );
195 app.db
196 .create_processed_stripe_event(&processed_event_params)
197 .await?;
198
199 continue;
200 }
201
202 let process_result = match event.type_ {
203 EventType::CustomerCreated | EventType::CustomerUpdated => {
204 handle_customer_event(app, real_stripe_client, event).await
205 }
206 EventType::CustomerSubscriptionCreated
207 | EventType::CustomerSubscriptionUpdated
208 | EventType::CustomerSubscriptionPaused
209 | EventType::CustomerSubscriptionResumed
210 | EventType::CustomerSubscriptionDeleted => {
211 handle_customer_subscription_event(app, rpc_server, stripe_client, event).await
212 }
213 _ => Ok(()),
214 };
215
216 if let Some(()) = process_result
217 .with_context(|| format!("failed to process event {event_id} successfully"))
218 .log_err()
219 {
220 app.db
221 .create_processed_stripe_event(&processed_event_params)
222 .await?;
223 }
224 }
225
226 Ok(())
227}
228
229async fn handle_customer_event(
230 app: &Arc<AppState>,
231 _stripe_client: &stripe::Client,
232 event: stripe::Event,
233) -> anyhow::Result<()> {
234 let EventObject::Customer(customer) = event.data.object else {
235 bail!("unexpected event payload for {}", event.id);
236 };
237
238 log::info!("handling Stripe {} event: {}", event.type_, event.id);
239
240 let Some(email) = customer.email else {
241 log::info!("Stripe customer has no email: skipping");
242 return Ok(());
243 };
244
245 let Some(user) = app.db.get_user_by_email(&email).await? else {
246 log::info!("no user found for email: skipping");
247 return Ok(());
248 };
249
250 if let Some(existing_customer) = app
251 .db
252 .get_billing_customer_by_stripe_customer_id(&customer.id)
253 .await?
254 {
255 app.db
256 .update_billing_customer(
257 existing_customer.id,
258 &UpdateBillingCustomerParams {
259 // For now we just leave the information as-is, as it is not
260 // likely to change.
261 ..Default::default()
262 },
263 )
264 .await?;
265 } else {
266 app.db
267 .create_billing_customer(&CreateBillingCustomerParams {
268 user_id: user.id,
269 stripe_customer_id: customer.id.to_string(),
270 })
271 .await?;
272 }
273
274 Ok(())
275}
276
277async fn sync_subscription(
278 app: &Arc<AppState>,
279 stripe_client: &Arc<dyn StripeClient>,
280 subscription: StripeSubscription,
281) -> anyhow::Result<billing_customer::Model> {
282 let subscription_kind = if let Some(stripe_billing) = &app.stripe_billing {
283 stripe_billing
284 .determine_subscription_kind(&subscription)
285 .await
286 } else {
287 None
288 };
289
290 let billing_customer =
291 find_or_create_billing_customer(app, stripe_client.as_ref(), &subscription.customer)
292 .await?
293 .context("billing customer not found")?;
294
295 if let Some(SubscriptionKind::ZedProTrial) = subscription_kind {
296 if subscription.status == SubscriptionStatus::Trialing {
297 let current_period_start =
298 DateTime::from_timestamp(subscription.current_period_start, 0)
299 .context("No trial subscription period start")?;
300
301 app.db
302 .update_billing_customer(
303 billing_customer.id,
304 &UpdateBillingCustomerParams {
305 trial_started_at: ActiveValue::set(Some(current_period_start.naive_utc())),
306 ..Default::default()
307 },
308 )
309 .await?;
310 }
311 }
312
313 let was_canceled_due_to_payment_failure = subscription.status == SubscriptionStatus::Canceled
314 && subscription
315 .cancellation_details
316 .as_ref()
317 .and_then(|details| details.reason)
318 .map_or(false, |reason| {
319 reason == StripeCancellationDetailsReason::PaymentFailed
320 });
321
322 if was_canceled_due_to_payment_failure {
323 app.db
324 .update_billing_customer(
325 billing_customer.id,
326 &UpdateBillingCustomerParams {
327 has_overdue_invoices: ActiveValue::set(true),
328 ..Default::default()
329 },
330 )
331 .await?;
332 }
333
334 if let Some(existing_subscription) = app
335 .db
336 .get_billing_subscription_by_stripe_subscription_id(subscription.id.0.as_ref())
337 .await?
338 {
339 app.db
340 .update_billing_subscription(
341 existing_subscription.id,
342 &UpdateBillingSubscriptionParams {
343 billing_customer_id: ActiveValue::set(billing_customer.id),
344 kind: ActiveValue::set(subscription_kind),
345 stripe_subscription_id: ActiveValue::set(subscription.id.to_string()),
346 stripe_subscription_status: ActiveValue::set(subscription.status.into()),
347 stripe_cancel_at: ActiveValue::set(
348 subscription
349 .cancel_at
350 .and_then(|cancel_at| DateTime::from_timestamp(cancel_at, 0))
351 .map(|time| time.naive_utc()),
352 ),
353 stripe_cancellation_reason: ActiveValue::set(
354 subscription
355 .cancellation_details
356 .and_then(|details| details.reason)
357 .map(|reason| reason.into()),
358 ),
359 stripe_current_period_start: ActiveValue::set(Some(
360 subscription.current_period_start,
361 )),
362 stripe_current_period_end: ActiveValue::set(Some(
363 subscription.current_period_end,
364 )),
365 },
366 )
367 .await?;
368 } else {
369 if let Some(existing_subscription) = app
370 .db
371 .get_active_billing_subscription(billing_customer.user_id)
372 .await?
373 {
374 if existing_subscription.kind == Some(SubscriptionKind::ZedFree)
375 && subscription_kind == Some(SubscriptionKind::ZedProTrial)
376 {
377 let stripe_subscription_id = StripeSubscriptionId(
378 existing_subscription.stripe_subscription_id.clone().into(),
379 );
380
381 stripe_client
382 .cancel_subscription(&stripe_subscription_id)
383 .await?;
384 } else {
385 // If the user already has an active billing subscription, ignore the
386 // event and return an `Ok` to signal that it was processed
387 // successfully.
388 //
389 // There is the possibility that this could cause us to not create a
390 // subscription in the following scenario:
391 //
392 // 1. User has an active subscription A
393 // 2. User cancels subscription A
394 // 3. User creates a new subscription B
395 // 4. We process the new subscription B before the cancellation of subscription A
396 // 5. User ends up with no subscriptions
397 //
398 // In theory this situation shouldn't arise as we try to process the events in the order they occur.
399
400 log::info!(
401 "user {user_id} already has an active subscription, skipping creation of subscription {subscription_id}",
402 user_id = billing_customer.user_id,
403 subscription_id = subscription.id
404 );
405 return Ok(billing_customer);
406 }
407 }
408
409 app.db
410 .create_billing_subscription(&CreateBillingSubscriptionParams {
411 billing_customer_id: billing_customer.id,
412 kind: subscription_kind,
413 stripe_subscription_id: subscription.id.to_string(),
414 stripe_subscription_status: subscription.status.into(),
415 stripe_cancellation_reason: subscription
416 .cancellation_details
417 .and_then(|details| details.reason)
418 .map(|reason| reason.into()),
419 stripe_current_period_start: Some(subscription.current_period_start),
420 stripe_current_period_end: Some(subscription.current_period_end),
421 })
422 .await?;
423 }
424
425 if let Some(stripe_billing) = app.stripe_billing.as_ref() {
426 if subscription.status == SubscriptionStatus::Canceled
427 || subscription.status == SubscriptionStatus::Paused
428 {
429 let already_has_active_billing_subscription = app
430 .db
431 .has_active_billing_subscription(billing_customer.user_id)
432 .await?;
433 if !already_has_active_billing_subscription {
434 let stripe_customer_id =
435 StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
436
437 stripe_billing
438 .subscribe_to_zed_free(stripe_customer_id)
439 .await?;
440 }
441 }
442 }
443
444 Ok(billing_customer)
445}
446
447async fn handle_customer_subscription_event(
448 app: &Arc<AppState>,
449 rpc_server: &Arc<Server>,
450 stripe_client: &Arc<dyn StripeClient>,
451 event: stripe::Event,
452) -> anyhow::Result<()> {
453 let EventObject::Subscription(subscription) = event.data.object else {
454 bail!("unexpected event payload for {}", event.id);
455 };
456
457 log::info!("handling Stripe {} event: {}", event.type_, event.id);
458
459 let billing_customer = sync_subscription(app, stripe_client, subscription.into()).await?;
460
461 // When the user's subscription changes, push down any changes to their plan.
462 rpc_server
463 .update_plan_for_user_legacy(billing_customer.user_id)
464 .await
465 .trace_err();
466
467 // When the user's subscription changes, we want to refresh their LLM tokens
468 // to either grant/revoke access.
469 rpc_server
470 .refresh_llm_tokens_for_user(billing_customer.user_id)
471 .await;
472
473 Ok(())
474}
475
476impl From<SubscriptionStatus> for StripeSubscriptionStatus {
477 fn from(value: SubscriptionStatus) -> Self {
478 match value {
479 SubscriptionStatus::Incomplete => Self::Incomplete,
480 SubscriptionStatus::IncompleteExpired => Self::IncompleteExpired,
481 SubscriptionStatus::Trialing => Self::Trialing,
482 SubscriptionStatus::Active => Self::Active,
483 SubscriptionStatus::PastDue => Self::PastDue,
484 SubscriptionStatus::Canceled => Self::Canceled,
485 SubscriptionStatus::Unpaid => Self::Unpaid,
486 SubscriptionStatus::Paused => Self::Paused,
487 }
488 }
489}
490
491impl From<CancellationDetailsReason> for StripeCancellationReason {
492 fn from(value: CancellationDetailsReason) -> Self {
493 match value {
494 CancellationDetailsReason::CancellationRequested => Self::CancellationRequested,
495 CancellationDetailsReason::PaymentDisputed => Self::PaymentDisputed,
496 CancellationDetailsReason::PaymentFailed => Self::PaymentFailed,
497 }
498 }
499}
500
501/// Finds or creates a billing customer using the provided customer.
502pub async fn find_or_create_billing_customer(
503 app: &Arc<AppState>,
504 stripe_client: &dyn StripeClient,
505 customer_id: &StripeCustomerId,
506) -> anyhow::Result<Option<billing_customer::Model>> {
507 // If we already have a billing customer record associated with the Stripe customer,
508 // there's nothing more we need to do.
509 if let Some(billing_customer) = app
510 .db
511 .get_billing_customer_by_stripe_customer_id(customer_id.0.as_ref())
512 .await?
513 {
514 return Ok(Some(billing_customer));
515 }
516
517 let customer = stripe_client.get_customer(customer_id).await?;
518
519 let Some(email) = customer.email else {
520 return Ok(None);
521 };
522
523 let Some(user) = app.db.get_user_by_email(&email).await? else {
524 return Ok(None);
525 };
526
527 let billing_customer = app
528 .db
529 .create_billing_customer(&CreateBillingCustomerParams {
530 user_id: user.id,
531 stripe_customer_id: customer.id.to_string(),
532 })
533 .await?;
534
535 Ok(Some(billing_customer))
536}
537
538const SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL: Duration = Duration::from_secs(60);
539
540pub fn sync_llm_request_usage_with_stripe_periodically(app: Arc<AppState>) {
541 let Some(stripe_billing) = app.stripe_billing.clone() else {
542 log::warn!("failed to retrieve Stripe billing object");
543 return;
544 };
545 let Some(llm_db) = app.llm_db.clone() else {
546 log::warn!("failed to retrieve LLM database");
547 return;
548 };
549
550 let executor = app.executor.clone();
551 executor.spawn_detached({
552 let executor = executor.clone();
553 async move {
554 loop {
555 sync_model_request_usage_with_stripe(&app, &llm_db, &stripe_billing)
556 .await
557 .context("failed to sync LLM request usage to Stripe")
558 .trace_err();
559 executor
560 .sleep(SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL)
561 .await;
562 }
563 }
564 });
565}
566
567async fn sync_model_request_usage_with_stripe(
568 app: &Arc<AppState>,
569 llm_db: &Arc<LlmDatabase>,
570 stripe_billing: &Arc<StripeBilling>,
571) -> anyhow::Result<()> {
572 log::info!("Stripe usage sync: Starting");
573 let started_at = Utc::now();
574
575 let staff_users = app.db.get_staff_users().await?;
576 let staff_user_ids = staff_users
577 .iter()
578 .map(|user| user.id)
579 .collect::<HashSet<UserId>>();
580
581 let usage_meters = llm_db
582 .get_current_subscription_usage_meters(Utc::now())
583 .await?;
584 let mut usage_meters_by_user_id =
585 HashMap::<UserId, Vec<subscription_usage_meter::Model>>::default();
586 for (usage_meter, usage) in usage_meters {
587 let meters = usage_meters_by_user_id.entry(usage.user_id).or_default();
588 meters.push(usage_meter);
589 }
590
591 log::info!("Stripe usage sync: Retrieving Zed Pro subscriptions");
592 let get_zed_pro_subscriptions_started_at = Utc::now();
593 let billing_subscriptions = app.db.get_active_zed_pro_billing_subscriptions().await?;
594 log::info!(
595 "Stripe usage sync: Retrieved {} Zed Pro subscriptions in {}",
596 billing_subscriptions.len(),
597 Utc::now() - get_zed_pro_subscriptions_started_at
598 );
599
600 let claude_sonnet_4 = stripe_billing
601 .find_price_by_lookup_key("claude-sonnet-4-requests")
602 .await?;
603 let claude_sonnet_4_max = stripe_billing
604 .find_price_by_lookup_key("claude-sonnet-4-requests-max")
605 .await?;
606 let claude_opus_4 = stripe_billing
607 .find_price_by_lookup_key("claude-opus-4-requests")
608 .await?;
609 let claude_opus_4_max = stripe_billing
610 .find_price_by_lookup_key("claude-opus-4-requests-max")
611 .await?;
612 let claude_3_5_sonnet = stripe_billing
613 .find_price_by_lookup_key("claude-3-5-sonnet-requests")
614 .await?;
615 let claude_3_7_sonnet = stripe_billing
616 .find_price_by_lookup_key("claude-3-7-sonnet-requests")
617 .await?;
618 let claude_3_7_sonnet_max = stripe_billing
619 .find_price_by_lookup_key("claude-3-7-sonnet-requests-max")
620 .await?;
621
622 let model_mode_combinations = [
623 ("claude-opus-4", CompletionMode::Max),
624 ("claude-opus-4", CompletionMode::Normal),
625 ("claude-sonnet-4", CompletionMode::Max),
626 ("claude-sonnet-4", CompletionMode::Normal),
627 ("claude-3-7-sonnet", CompletionMode::Max),
628 ("claude-3-7-sonnet", CompletionMode::Normal),
629 ("claude-3-5-sonnet", CompletionMode::Normal),
630 ];
631
632 let billing_subscription_count = billing_subscriptions.len();
633
634 log::info!("Stripe usage sync: Syncing {billing_subscription_count} Zed Pro subscriptions");
635
636 for (user_id, (billing_customer, billing_subscription)) in billing_subscriptions {
637 maybe!(async {
638 if staff_user_ids.contains(&user_id) {
639 return anyhow::Ok(());
640 }
641
642 let stripe_customer_id =
643 StripeCustomerId(billing_customer.stripe_customer_id.clone().into());
644 let stripe_subscription_id =
645 StripeSubscriptionId(billing_subscription.stripe_subscription_id.clone().into());
646
647 let usage_meters = usage_meters_by_user_id.get(&user_id);
648
649 for (model, mode) in &model_mode_combinations {
650 let Ok(model) =
651 llm_db.model(LanguageModelProvider::Anthropic, model)
652 else {
653 log::warn!("Failed to load model for user {user_id}: {model}");
654 continue;
655 };
656
657 let (price, meter_event_name) = match model.name.as_str() {
658 "claude-opus-4" => match mode {
659 CompletionMode::Normal => (&claude_opus_4, "claude_opus_4/requests"),
660 CompletionMode::Max => (&claude_opus_4_max, "claude_opus_4/requests/max"),
661 },
662 "claude-sonnet-4" => match mode {
663 CompletionMode::Normal => (&claude_sonnet_4, "claude_sonnet_4/requests"),
664 CompletionMode::Max => {
665 (&claude_sonnet_4_max, "claude_sonnet_4/requests/max")
666 }
667 },
668 "claude-3-5-sonnet" => (&claude_3_5_sonnet, "claude_3_5_sonnet/requests"),
669 "claude-3-7-sonnet" => match mode {
670 CompletionMode::Normal => {
671 (&claude_3_7_sonnet, "claude_3_7_sonnet/requests")
672 }
673 CompletionMode::Max => {
674 (&claude_3_7_sonnet_max, "claude_3_7_sonnet/requests/max")
675 }
676 },
677 model_name => {
678 bail!("Attempted to sync usage meter for unsupported model: {model_name:?}")
679 }
680 };
681
682 let model_requests = usage_meters
683 .and_then(|usage_meters| {
684 usage_meters
685 .iter()
686 .find(|meter| meter.model_id == model.id && meter.mode == *mode)
687 })
688 .map(|usage_meter| usage_meter.requests)
689 .unwrap_or(0);
690
691 if model_requests > 0 {
692 stripe_billing
693 .subscribe_to_price(&stripe_subscription_id, price)
694 .await?;
695 }
696
697 stripe_billing
698 .bill_model_request_usage(&stripe_customer_id, meter_event_name, model_requests)
699 .await
700 .with_context(|| {
701 format!(
702 "Failed to bill model request usage of {model_requests} for {stripe_customer_id}: {meter_event_name}",
703 )
704 })?;
705 }
706
707 Ok(())
708 })
709 .await
710 .log_err();
711 }
712
713 log::info!(
714 "Stripe usage sync: Synced {billing_subscription_count} Zed Pro subscriptions in {}",
715 Utc::now() - started_at
716 );
717
718 Ok(())
719}