@@ -288,7 +288,32 @@ async fn manage_billing_subscription(
}))
}
-const POLL_EVENTS_INTERVAL: Duration = Duration::from_secs(5 * 60);
+/// The amount of time we wait in between each poll of Stripe events.
+///
+/// This value should strike a balance between:
+/// 1. Being short enough that we update quickly when something in Stripe changes
+/// 2. Being long enough that we don't eat into our rate limits.
+///
+/// As a point of reference, the Sequin folks say they have this at **500ms**:
+///
+/// > We poll the Stripe /events endpoint every 500ms per account
+/// >
+/// > — https://blog.sequinstream.com/events-not-webhooks/
+const POLL_EVENTS_INTERVAL: Duration = Duration::from_secs(5);
+
+/// The maximum number of events to return per page.
+///
+/// We set this to 100 (the max) so we have to make fewer requests to Stripe.
+///
+/// > Limit can range between 1 and 100, and the default is 10.
+const EVENTS_LIMIT_PER_PAGE: u64 = 100;
+
+/// The number of pages consisting entirely of already-processed events that we
+/// will see before we stop retrieving events.
+///
+/// This is used to prevent over-fetching the Stripe events API for events we've
+/// already seen and processed.
+const NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP: usize = 4;
/// Polls the Stripe events API periodically to reconcile the records in our
/// database with the data in Stripe.
@@ -334,14 +359,20 @@ async fn poll_stripe_events(
.map(event_type_to_string)
.collect::<Vec<_>>();
+ let mut pages_of_already_processed_events = 0;
let mut unprocessed_events = Vec::new();
loop {
+ if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP {
+ log::info!("saw {pages_of_already_processed_events} pages of already-processed events: stopping event retrieval");
+ break;
+ }
+
log::info!("retrieving events from Stripe: {}", event_types.join(", "));
let mut params = ListEvents::new();
params.types = Some(event_types.clone());
- params.limit = Some(100);
+ params.limit = Some(EVENTS_LIMIT_PER_PAGE);
let events = stripe::Event::list(stripe_client, ¶ms).await?;
@@ -360,14 +391,21 @@ async fn poll_stripe_events(
.collect::<Vec<_>>()
};
+ let mut processed_events_in_page = 0;
+ let events_in_page = events.data.len();
for event in events.data {
if processed_event_ids.contains(&event.id.to_string()) {
- log::info!("Stripe event {} already processed: skipping", event.id);
+ processed_events_in_page += 1;
+ log::debug!("Stripe event {} already processed: skipping", event.id);
} else {
unprocessed_events.push(event);
}
}
+ if processed_events_in_page == events_in_page {
+ pages_of_already_processed_events += 1;
+ }
+
if !events.has_more {
break;
}
@@ -382,33 +420,35 @@ async fn poll_stripe_events(
unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id)));
for event in unprocessed_events {
+ let event_id = event.id.clone();
let processed_event_params = CreateProcessedStripeEventParams {
stripe_event_id: event.id.to_string(),
stripe_event_type: event_type_to_string(event.type_),
stripe_event_created_timestamp: event.created,
};
- match event.type_ {
+ let process_result = match event.type_ {
EventType::CustomerCreated | EventType::CustomerUpdated => {
- handle_customer_event(app, stripe_client, event)
- .await
- .log_err();
+ handle_customer_event(app, stripe_client, event).await
}
EventType::CustomerSubscriptionCreated
| EventType::CustomerSubscriptionUpdated
| EventType::CustomerSubscriptionPaused
| EventType::CustomerSubscriptionResumed
| EventType::CustomerSubscriptionDeleted => {
- handle_customer_subscription_event(app, stripe_client, event)
- .await
- .log_err();
+ handle_customer_subscription_event(app, stripe_client, event).await
}
- _ => {}
- }
+ _ => Ok(()),
+ };
- app.db
- .create_processed_stripe_event(&processed_event_params)
- .await?;
+ if let Some(()) = process_result
+ .with_context(|| format!("failed to process event {event_id} successfully"))
+ .log_err()
+ {
+ app.db
+ .create_processed_stripe_event(&processed_event_params)
+ .await?;
+ }
}
Ok(())