event_coalescer.rs

  1use chrono::{DateTime, Duration, Utc};
  2use std::time;
  3
  4const COALESCE_TIMEOUT: time::Duration = time::Duration::from_secs(20);
  5const SIMULATED_DURATION_FOR_SINGLE_EVENT: time::Duration = time::Duration::from_millis(1);
  6
  7#[derive(Debug, PartialEq)]
  8struct PeriodData {
  9    environment: &'static str,
 10    start: DateTime<Utc>,
 11    end: Option<DateTime<Utc>>,
 12}
 13
 14pub struct EventCoalescer {
 15    state: Option<PeriodData>,
 16}
 17
 18impl EventCoalescer {
 19    pub fn new() -> Self {
 20        Self { state: None }
 21    }
 22
 23    pub fn log_event(
 24        &mut self,
 25        environment: &'static str,
 26    ) -> Option<(DateTime<Utc>, DateTime<Utc>, &'static str)> {
 27        self.log_event_with_time(Utc::now(), environment)
 28    }
 29
 30    // pub fn close_current_period(&mut self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
 31    //     self.environment.map(|env| self.log_event(env)).flatten()
 32    // }
 33
 34    fn log_event_with_time(
 35        &mut self,
 36        log_time: DateTime<Utc>,
 37        environment: &'static str,
 38    ) -> Option<(DateTime<Utc>, DateTime<Utc>, &'static str)> {
 39        let coalesce_timeout = Duration::from_std(COALESCE_TIMEOUT).unwrap();
 40
 41        let Some(state) = &mut self.state else {
 42            self.state = Some(PeriodData {
 43                start: log_time,
 44                end: None,
 45                environment,
 46            });
 47            return None;
 48        };
 49
 50        let period_end = state
 51            .end
 52            .unwrap_or(state.start + SIMULATED_DURATION_FOR_SINGLE_EVENT);
 53        let within_timeout = log_time - period_end < coalesce_timeout;
 54        let environment_is_same = state.environment == environment;
 55        let should_coaelesce = !within_timeout || !environment_is_same;
 56
 57        if should_coaelesce {
 58            let previous_environment = state.environment;
 59            let original_start = state.start;
 60
 61            state.start = log_time;
 62            state.end = None;
 63            state.environment = environment;
 64
 65            return Some((
 66                original_start,
 67                if within_timeout { log_time } else { period_end },
 68                previous_environment,
 69            ));
 70        }
 71
 72        state.end = Some(log_time);
 73
 74        None
 75    }
 76}
 77
 78#[cfg(test)]
 79mod tests {
 80    use chrono::TimeZone;
 81
 82    use super::*;
 83
 84    #[test]
 85    fn test_same_context_exceeding_timeout() {
 86        let environment_1 = "environment_1";
 87        let mut event_coalescer = EventCoalescer::new();
 88
 89        assert_eq!(event_coalescer.state, None);
 90
 91        let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
 92        let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
 93
 94        assert_eq!(period_data, None);
 95        assert_eq!(
 96            event_coalescer.state,
 97            Some(PeriodData {
 98                start: period_start,
 99                end: None,
100                environment: environment_1,
101            })
102        );
103
104        let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
105        let mut period_end = period_start;
106
107        // Ensure that many calls within the timeout don't start a new period
108        for _ in 0..100 {
109            period_end += within_timeout_adjustment;
110            let period_data = event_coalescer.log_event_with_time(period_end, environment_1);
111
112            assert_eq!(period_data, None);
113            assert_eq!(
114                event_coalescer.state,
115                Some(PeriodData {
116                    start: period_start,
117                    end: Some(period_end),
118                    environment: environment_1,
119                })
120            );
121        }
122
123        let exceed_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT * 2).unwrap();
124        // Logging an event exceeding the timeout should start a new period
125        let new_period_start = period_end + exceed_timeout_adjustment;
126        let period_data = event_coalescer.log_event_with_time(new_period_start, environment_1);
127
128        assert_eq!(period_data, Some((period_start, period_end, environment_1)));
129        assert_eq!(
130            event_coalescer.state,
131            Some(PeriodData {
132                start: new_period_start,
133                end: None,
134                environment: environment_1,
135            })
136        );
137    }
138
139    #[test]
140    fn test_different_environment_under_timeout() {
141        let environment_1 = "environment_1";
142        let mut event_coalescer = EventCoalescer::new();
143
144        assert_eq!(event_coalescer.state, None);
145
146        let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
147        let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
148
149        assert_eq!(period_data, None);
150        assert_eq!(
151            event_coalescer.state,
152            Some(PeriodData {
153                start: period_start,
154                end: None,
155                environment: environment_1,
156            })
157        );
158
159        let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
160        let period_end = period_start + within_timeout_adjustment;
161        let period_data = event_coalescer.log_event_with_time(period_end, environment_1);
162
163        assert_eq!(period_data, None);
164        assert_eq!(
165            event_coalescer.state,
166            Some(PeriodData {
167                start: period_start,
168                end: Some(period_end),
169                environment: environment_1,
170            })
171        );
172
173        // Logging an event within the timeout but with a different environment should start a new period
174        let period_end = period_end + within_timeout_adjustment;
175        let environment_2 = "environment_2";
176        let period_data = event_coalescer.log_event_with_time(period_end, environment_2);
177
178        assert_eq!(period_data, Some((period_start, period_end, environment_1)));
179        assert_eq!(
180            event_coalescer.state,
181            Some(PeriodData {
182                start: period_end,
183                end: None,
184                environment: environment_2,
185            })
186        );
187    }
188
189    #[test]
190    fn test_switching_environment_while_within_timeout() {
191        let environment_1 = "environment_1";
192        let mut event_coalescer = EventCoalescer::new();
193
194        assert_eq!(event_coalescer.state, None);
195
196        let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
197        let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
198
199        assert_eq!(period_data, None);
200        assert_eq!(
201            event_coalescer.state,
202            Some(PeriodData {
203                start: period_start,
204                end: None,
205                environment: environment_1,
206            })
207        );
208
209        let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
210        let period_end = period_start + within_timeout_adjustment;
211        let environment_2 = "environment_2";
212        let period_data = event_coalescer.log_event_with_time(period_end, environment_2);
213
214        assert_eq!(period_data, Some((period_start, period_end, environment_1)));
215        assert_eq!(
216            event_coalescer.state,
217            Some(PeriodData {
218                start: period_end,
219                end: None,
220                environment: environment_2,
221            })
222        );
223    }
224    // // 0                   20                  40                  60
225    // // |-------------------|-------------------|-------------------|-------------------
226    // // |--------|----------env change
227    // //          |-------------------
228    // // |period_start       |period_end
229    // //                     |new_period_start
230
231    #[test]
232    fn test_switching_environment_while_exceeding_timeout() {
233        let environment_1 = "environment_1";
234        let mut event_coalescer = EventCoalescer::new();
235
236        assert_eq!(event_coalescer.state, None);
237
238        let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
239        let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
240
241        assert_eq!(period_data, None);
242        assert_eq!(
243            event_coalescer.state,
244            Some(PeriodData {
245                start: period_start,
246                end: None,
247                environment: environment_1,
248            })
249        );
250
251        let exceed_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT * 2).unwrap();
252        let period_end = period_start + exceed_timeout_adjustment;
253        let environment_2 = "environment_2";
254        let period_data = event_coalescer.log_event_with_time(period_end, environment_2);
255
256        assert_eq!(
257            period_data,
258            Some((
259                period_start,
260                period_start + SIMULATED_DURATION_FOR_SINGLE_EVENT,
261                environment_1
262            ))
263        );
264        assert_eq!(
265            event_coalescer.state,
266            Some(PeriodData {
267                start: period_end,
268                end: None,
269                environment: environment_2,
270            })
271        );
272    }
273    // 0                   20                  40                  60
274    // |-------------------|-------------------|-------------------|-------------------
275    // |--------|----------------------------------------env change
276    //          |-------------------|
277    // |period_start                |period_end
278    //                                                   |new_period_start
279}