diff --git a/crates/collab/src/api/billing.rs b/crates/collab/src/api/billing.rs index 46406e0e38b9d1de06a52589c4961a814b700e55..b0de5b1c67192b511974a2e0afd5ab4469b5aaf4 100644 --- a/crates/collab/src/api/billing.rs +++ b/crates/collab/src/api/billing.rs @@ -288,7 +288,32 @@ async fn manage_billing_subscription( })) } -const POLL_EVENTS_INTERVAL: Duration = Duration::from_secs(5 * 60); +/// The amount of time we wait in between each poll of Stripe events. +/// +/// This value should strike a balance between: +/// 1. Being short enough that we update quickly when something in Stripe changes +/// 2. Being long enough that we don't eat into our rate limits. +/// +/// As a point of reference, the Sequin folks say they have this at **500ms**: +/// +/// > We poll the Stripe /events endpoint every 500ms per account +/// > +/// > — https://blog.sequinstream.com/events-not-webhooks/ +const POLL_EVENTS_INTERVAL: Duration = Duration::from_secs(5); + +/// The maximum number of events to return per page. +/// +/// We set this to 100 (the max) so we have to make fewer requests to Stripe. +/// +/// > Limit can range between 1 and 100, and the default is 10. +const EVENTS_LIMIT_PER_PAGE: u64 = 100; + +/// The number of pages consisting entirely of already-processed events that we +/// will see before we stop retrieving events. +/// +/// This is used to prevent over-fetching the Stripe events API for events we've +/// already seen and processed. +const NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP: usize = 4; /// Polls the Stripe events API periodically to reconcile the records in our /// database with the data in Stripe. @@ -334,14 +359,20 @@ async fn poll_stripe_events( .map(event_type_to_string) .collect::>(); + let mut pages_of_already_processed_events = 0; let mut unprocessed_events = Vec::new(); loop { + if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP { + log::info!("saw {pages_of_already_processed_events} pages of already-processed events: stopping event retrieval"); + break; + } + log::info!("retrieving events from Stripe: {}", event_types.join(", ")); let mut params = ListEvents::new(); params.types = Some(event_types.clone()); - params.limit = Some(100); + params.limit = Some(EVENTS_LIMIT_PER_PAGE); let events = stripe::Event::list(stripe_client, ¶ms).await?; @@ -360,14 +391,21 @@ async fn poll_stripe_events( .collect::>() }; + let mut processed_events_in_page = 0; + let events_in_page = events.data.len(); for event in events.data { if processed_event_ids.contains(&event.id.to_string()) { - log::info!("Stripe event {} already processed: skipping", event.id); + processed_events_in_page += 1; + log::debug!("Stripe event {} already processed: skipping", event.id); } else { unprocessed_events.push(event); } } + if processed_events_in_page == events_in_page { + pages_of_already_processed_events += 1; + } + if !events.has_more { break; } @@ -382,33 +420,35 @@ async fn poll_stripe_events( unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id))); for event in unprocessed_events { + let event_id = event.id.clone(); let processed_event_params = CreateProcessedStripeEventParams { stripe_event_id: event.id.to_string(), stripe_event_type: event_type_to_string(event.type_), stripe_event_created_timestamp: event.created, }; - match event.type_ { + let process_result = match event.type_ { EventType::CustomerCreated | EventType::CustomerUpdated => { - handle_customer_event(app, stripe_client, event) - .await - .log_err(); + handle_customer_event(app, stripe_client, event).await } EventType::CustomerSubscriptionCreated | EventType::CustomerSubscriptionUpdated | EventType::CustomerSubscriptionPaused | EventType::CustomerSubscriptionResumed | EventType::CustomerSubscriptionDeleted => { - handle_customer_subscription_event(app, stripe_client, event) - .await - .log_err(); + handle_customer_subscription_event(app, stripe_client, event).await } - _ => {} - } + _ => Ok(()), + }; - app.db - .create_processed_stripe_event(&processed_event_params) - .await?; + if let Some(()) = process_result + .with_context(|| format!("failed to process event {event_id} successfully")) + .log_err() + { + app.db + .create_processed_stripe_event(&processed_event_params) + .await?; + } } Ok(())