@@ -452,29 +452,28 @@ async fn poll_stripe_events(
let mut pages_of_already_processed_events = 0;
let mut unprocessed_events = Vec::new();
- loop {
- if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP {
- log::info!("saw {pages_of_already_processed_events} pages of already-processed events: stopping event retrieval");
- break;
- }
-
- log::info!("retrieving events from Stripe: {}", event_types.join(", "));
-
- let mut params = ListEvents::new();
- params.types = Some(event_types.clone());
- params.limit = Some(EVENTS_LIMIT_PER_PAGE);
+ log::info!(
+ "Stripe events: starting retrieval for {}",
+ event_types.join(", ")
+ );
+ let mut params = ListEvents::new();
+ params.types = Some(event_types.clone());
+ params.limit = Some(EVENTS_LIMIT_PER_PAGE);
- let events = stripe::Event::list(stripe_client, ¶ms).await?;
+ let mut event_pages = stripe::Event::list(&stripe_client, ¶ms)
+ .await?
+ .paginate(params);
+ loop {
let processed_event_ids = {
- let event_ids = &events
+ let event_ids = event_pages
+ .page
.data
.iter()
.map(|event| event.id.as_str())
.collect::<Vec<_>>();
-
app.db
- .get_processed_stripe_events_by_event_ids(event_ids)
+ .get_processed_stripe_events_by_event_ids(&event_ids)
.await?
.into_iter()
.map(|event| event.stripe_event_id)
@@ -482,13 +481,13 @@ async fn poll_stripe_events(
};
let mut processed_events_in_page = 0;
- let events_in_page = events.data.len();
- for event in events.data {
+ let events_in_page = event_pages.page.data.len();
+ for event in &event_pages.page.data {
if processed_event_ids.contains(&event.id.to_string()) {
processed_events_in_page += 1;
- log::debug!("Stripe event {} already processed: skipping", event.id);
+ log::debug!("Stripe events: already processed '{}', skipping", event.id);
} else {
- unprocessed_events.push(event);
+ unprocessed_events.push(event.clone());
}
}
@@ -496,15 +495,21 @@ async fn poll_stripe_events(
pages_of_already_processed_events += 1;
}
- if !events.has_more {
+ if event_pages.page.has_more {
+ if pages_of_already_processed_events >= NUMBER_OF_ALREADY_PROCESSED_PAGES_BEFORE_WE_STOP
+ {
+ log::info!("Stripe events: stopping, saw {pages_of_already_processed_events} pages of already-processed events");
+ break;
+ } else {
+ log::info!("Stripe events: retrieving next page");
+ event_pages = event_pages.next(&stripe_client).await?;
+ }
+ } else {
break;
}
}
- log::info!(
- "unprocessed events from Stripe: {}",
- unprocessed_events.len()
- );
+ log::info!("Stripe events: unprocessed {}", unprocessed_events.len());
// Sort all of the unprocessed events in ascending order, so we can handle them in the order they occurred.
unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id)));
@@ -520,12 +525,12 @@ async fn poll_stripe_events(
// If the event has happened too far in the past, we don't want to
// process it and risk overwriting other more-recent updates.
//
- // 1 hour was chosen arbitrarily. This could be made longer or shorter.
- let one_hour = Duration::from_secs(60 * 60);
- let an_hour_ago = Utc::now() - one_hour;
- if an_hour_ago.timestamp() > event.created {
+ // 1 day was chosen arbitrarily. This could be made longer or shorter.
+ let one_day = Duration::from_secs(24 * 60 * 60);
+ let a_day_ago = Utc::now() - one_day;
+ if a_day_ago.timestamp() > event.created {
log::info!(
- "Stripe event {} is more than {one_hour:?} old, marking as processed",
+ "Stripe events: event '{}' is more than {one_day:?} old, marking as processed",
event_id
);
app.db