event_coalescer.rs

  1use std::sync::Arc;
  2use std::time;
  3
  4use chrono::{DateTime, Duration, Utc};
  5use clock::SystemClock;
  6
  7const COALESCE_TIMEOUT: time::Duration = time::Duration::from_secs(20);
  8const SIMULATED_DURATION_FOR_SINGLE_EVENT: time::Duration = time::Duration::from_millis(1);
  9
 10#[derive(Debug, PartialEq)]
 11struct PeriodData {
 12    environment: &'static str,
 13    start: DateTime<Utc>,
 14    end: Option<DateTime<Utc>>,
 15}
 16
 17pub struct EventCoalescer {
 18    clock: Arc<dyn SystemClock>,
 19    state: Option<PeriodData>,
 20}
 21
 22impl EventCoalescer {
 23    pub fn new(clock: Arc<dyn SystemClock>) -> Self {
 24        Self { clock, state: None }
 25    }
 26
 27    pub fn log_event(
 28        &mut self,
 29        environment: &'static str,
 30    ) -> Option<(DateTime<Utc>, DateTime<Utc>, &'static str)> {
 31        let log_time = self.clock.utc_now();
 32        let coalesce_timeout = Duration::from_std(COALESCE_TIMEOUT).unwrap();
 33
 34        let Some(state) = &mut self.state else {
 35            self.state = Some(PeriodData {
 36                start: log_time,
 37                end: None,
 38                environment,
 39            });
 40            return None;
 41        };
 42
 43        let period_end = state
 44            .end
 45            .unwrap_or(state.start + SIMULATED_DURATION_FOR_SINGLE_EVENT);
 46        let within_timeout = log_time - period_end < coalesce_timeout;
 47        let environment_is_same = state.environment == environment;
 48        let should_coaelesce = !within_timeout || !environment_is_same;
 49
 50        if should_coaelesce {
 51            let previous_environment = state.environment;
 52            let original_start = state.start;
 53
 54            state.start = log_time;
 55            state.end = None;
 56            state.environment = environment;
 57
 58            return Some((
 59                original_start,
 60                if within_timeout { log_time } else { period_end },
 61                previous_environment,
 62            ));
 63        }
 64
 65        state.end = Some(log_time);
 66
 67        None
 68    }
 69}
 70
 71#[cfg(test)]
 72mod tests {
 73    use chrono::TimeZone;
 74    use clock::FakeSystemClock;
 75
 76    use super::*;
 77
 78    #[test]
 79    fn test_same_context_exceeding_timeout() {
 80        let clock = Arc::new(FakeSystemClock::new(
 81            Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap(),
 82        ));
 83        let environment_1 = "environment_1";
 84        let mut event_coalescer = EventCoalescer::new(clock.clone());
 85
 86        assert_eq!(event_coalescer.state, None);
 87
 88        let period_start = clock.utc_now();
 89        let period_data = event_coalescer.log_event(environment_1);
 90
 91        assert_eq!(period_data, None);
 92        assert_eq!(
 93            event_coalescer.state,
 94            Some(PeriodData {
 95                start: period_start,
 96                end: None,
 97                environment: environment_1,
 98            })
 99        );
100
101        let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
102
103        // Ensure that many calls within the timeout don't start a new period
104        for _ in 0..100 {
105            clock.advance(within_timeout_adjustment);
106            let period_data = event_coalescer.log_event(environment_1);
107            let period_end = clock.utc_now();
108
109            assert_eq!(period_data, None);
110            assert_eq!(
111                event_coalescer.state,
112                Some(PeriodData {
113                    start: period_start,
114                    end: Some(period_end),
115                    environment: environment_1,
116                })
117            );
118        }
119
120        let period_end = clock.utc_now();
121        let exceed_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT * 2).unwrap();
122        // Logging an event exceeding the timeout should start a new period
123        clock.advance(exceed_timeout_adjustment);
124        let new_period_start = clock.utc_now();
125        let period_data = event_coalescer.log_event(environment_1);
126
127        assert_eq!(period_data, Some((period_start, period_end, environment_1)));
128        assert_eq!(
129            event_coalescer.state,
130            Some(PeriodData {
131                start: new_period_start,
132                end: None,
133                environment: environment_1,
134            })
135        );
136    }
137
138    #[test]
139    fn test_different_environment_under_timeout() {
140        let clock = Arc::new(FakeSystemClock::new(
141            Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap(),
142        ));
143        let environment_1 = "environment_1";
144        let mut event_coalescer = EventCoalescer::new(clock.clone());
145
146        assert_eq!(event_coalescer.state, None);
147
148        let period_start = clock.utc_now();
149        let period_data = event_coalescer.log_event(environment_1);
150
151        assert_eq!(period_data, None);
152        assert_eq!(
153            event_coalescer.state,
154            Some(PeriodData {
155                start: period_start,
156                end: None,
157                environment: environment_1,
158            })
159        );
160
161        let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
162        clock.advance(within_timeout_adjustment);
163        let period_end = clock.utc_now();
164        let period_data = event_coalescer.log_event(environment_1);
165
166        assert_eq!(period_data, None);
167        assert_eq!(
168            event_coalescer.state,
169            Some(PeriodData {
170                start: period_start,
171                end: Some(period_end),
172                environment: environment_1,
173            })
174        );
175
176        clock.advance(within_timeout_adjustment);
177
178        // Logging an event within the timeout but with a different environment should start a new period
179        let period_end = clock.utc_now();
180        let environment_2 = "environment_2";
181        let period_data = event_coalescer.log_event(environment_2);
182
183        assert_eq!(period_data, Some((period_start, period_end, environment_1)));
184        assert_eq!(
185            event_coalescer.state,
186            Some(PeriodData {
187                start: period_end,
188                end: None,
189                environment: environment_2,
190            })
191        );
192    }
193
194    #[test]
195    fn test_switching_environment_while_within_timeout() {
196        let clock = Arc::new(FakeSystemClock::new(
197            Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap(),
198        ));
199        let environment_1 = "environment_1";
200        let mut event_coalescer = EventCoalescer::new(clock.clone());
201
202        assert_eq!(event_coalescer.state, None);
203
204        let period_start = clock.utc_now();
205        let period_data = event_coalescer.log_event(environment_1);
206
207        assert_eq!(period_data, None);
208        assert_eq!(
209            event_coalescer.state,
210            Some(PeriodData {
211                start: period_start,
212                end: None,
213                environment: environment_1,
214            })
215        );
216
217        let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
218        clock.advance(within_timeout_adjustment);
219        let period_end = clock.utc_now();
220        let environment_2 = "environment_2";
221        let period_data = event_coalescer.log_event(environment_2);
222
223        assert_eq!(period_data, Some((period_start, period_end, environment_1)));
224        assert_eq!(
225            event_coalescer.state,
226            Some(PeriodData {
227                start: period_end,
228                end: None,
229                environment: environment_2,
230            })
231        );
232    }
233
234    // 0                   20                  40                  60
235    // |-------------------|-------------------|-------------------|-------------------
236    // |--------|----------env change
237    //          |-------------------
238    // |period_start       |period_end
239    //                     |new_period_start
240
241    #[test]
242    fn test_switching_environment_while_exceeding_timeout() {
243        let clock = Arc::new(FakeSystemClock::new(
244            Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap(),
245        ));
246        let environment_1 = "environment_1";
247        let mut event_coalescer = EventCoalescer::new(clock.clone());
248
249        assert_eq!(event_coalescer.state, None);
250
251        let period_start = clock.utc_now();
252        let period_data = event_coalescer.log_event(environment_1);
253
254        assert_eq!(period_data, None);
255        assert_eq!(
256            event_coalescer.state,
257            Some(PeriodData {
258                start: period_start,
259                end: None,
260                environment: environment_1,
261            })
262        );
263
264        let exceed_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT * 2).unwrap();
265        clock.advance(exceed_timeout_adjustment);
266        let period_end = clock.utc_now();
267        let environment_2 = "environment_2";
268        let period_data = event_coalescer.log_event(environment_2);
269
270        assert_eq!(
271            period_data,
272            Some((
273                period_start,
274                period_start + SIMULATED_DURATION_FOR_SINGLE_EVENT,
275                environment_1
276            ))
277        );
278        assert_eq!(
279            event_coalescer.state,
280            Some(PeriodData {
281                start: period_end,
282                end: None,
283                environment: environment_2,
284            })
285        );
286    }
287
288    // 0                   20                  40                  60
289    // |-------------------|-------------------|-------------------|-------------------
290    // |--------|----------------------------------------env change
291    //          |-------------------|
292    // |period_start                |period_end
293    //                                                   |new_period_start
294}