event_coalescer.rs

  1use std::time;
  2use std::{sync::Arc, time::Instant};
  3
  4use clock::SystemClock;
  5
  6const COALESCE_TIMEOUT: time::Duration = time::Duration::from_secs(20);
  7const SIMULATED_DURATION_FOR_SINGLE_EVENT: time::Duration = time::Duration::from_millis(1);
  8
  9#[derive(Debug, PartialEq)]
 10struct PeriodData {
 11    environment: &'static str,
 12    start: Instant,
 13    end: Option<Instant>,
 14}
 15
 16pub struct EventCoalescer {
 17    clock: Arc<dyn SystemClock>,
 18    state: Option<PeriodData>,
 19}
 20
 21impl EventCoalescer {
 22    pub fn new(clock: Arc<dyn SystemClock>) -> Self {
 23        Self { clock, state: None }
 24    }
 25
 26    pub fn log_event(
 27        &mut self,
 28        environment: &'static str,
 29    ) -> Option<(Instant, Instant, &'static str)> {
 30        let log_time = self.clock.utc_now();
 31
 32        let Some(state) = &mut self.state else {
 33            self.state = Some(PeriodData {
 34                start: log_time,
 35                end: None,
 36                environment,
 37            });
 38            return None;
 39        };
 40
 41        let period_end = state
 42            .end
 43            .unwrap_or(state.start + SIMULATED_DURATION_FOR_SINGLE_EVENT);
 44        let within_timeout = log_time - period_end < COALESCE_TIMEOUT;
 45        let environment_is_same = state.environment == environment;
 46        let should_coaelesce = !within_timeout || !environment_is_same;
 47
 48        if should_coaelesce {
 49            let previous_environment = state.environment;
 50            let original_start = state.start;
 51
 52            state.start = log_time;
 53            state.end = None;
 54            state.environment = environment;
 55
 56            return Some((
 57                original_start,
 58                if within_timeout { log_time } else { period_end },
 59                previous_environment,
 60            ));
 61        }
 62
 63        state.end = Some(log_time);
 64
 65        None
 66    }
 67}
 68
 69#[cfg(test)]
 70mod tests {
 71    use clock::FakeSystemClock;
 72
 73    use super::*;
 74
 75    #[test]
 76    fn test_same_context_exceeding_timeout() {
 77        let clock = Arc::new(FakeSystemClock::new());
 78        let environment_1 = "environment_1";
 79        let mut event_coalescer = EventCoalescer::new(clock.clone());
 80
 81        assert_eq!(event_coalescer.state, None);
 82
 83        let period_start = clock.utc_now();
 84        let period_data = event_coalescer.log_event(environment_1);
 85
 86        assert_eq!(period_data, None);
 87        assert_eq!(
 88            event_coalescer.state,
 89            Some(PeriodData {
 90                start: period_start,
 91                end: None,
 92                environment: environment_1,
 93            })
 94        );
 95
 96        let within_timeout_adjustment = COALESCE_TIMEOUT / 2;
 97
 98        // Ensure that many calls within the timeout don't start a new period
 99        for _ in 0..100 {
100            clock.advance(within_timeout_adjustment);
101            let period_data = event_coalescer.log_event(environment_1);
102            let period_end = clock.utc_now();
103
104            assert_eq!(period_data, None);
105            assert_eq!(
106                event_coalescer.state,
107                Some(PeriodData {
108                    start: period_start,
109                    end: Some(period_end),
110                    environment: environment_1,
111                })
112            );
113        }
114
115        let period_end = clock.utc_now();
116        let exceed_timeout_adjustment = COALESCE_TIMEOUT * 2;
117        // Logging an event exceeding the timeout should start a new period
118        clock.advance(exceed_timeout_adjustment);
119        let new_period_start = clock.utc_now();
120        let period_data = event_coalescer.log_event(environment_1);
121
122        assert_eq!(period_data, Some((period_start, period_end, environment_1)));
123        assert_eq!(
124            event_coalescer.state,
125            Some(PeriodData {
126                start: new_period_start,
127                end: None,
128                environment: environment_1,
129            })
130        );
131    }
132
133    #[test]
134    fn test_different_environment_under_timeout() {
135        let clock = Arc::new(FakeSystemClock::new());
136        let environment_1 = "environment_1";
137        let mut event_coalescer = EventCoalescer::new(clock.clone());
138
139        assert_eq!(event_coalescer.state, None);
140
141        let period_start = clock.utc_now();
142        let period_data = event_coalescer.log_event(environment_1);
143
144        assert_eq!(period_data, None);
145        assert_eq!(
146            event_coalescer.state,
147            Some(PeriodData {
148                start: period_start,
149                end: None,
150                environment: environment_1,
151            })
152        );
153
154        let within_timeout_adjustment = COALESCE_TIMEOUT / 2;
155        clock.advance(within_timeout_adjustment);
156        let period_end = clock.utc_now();
157        let period_data = event_coalescer.log_event(environment_1);
158
159        assert_eq!(period_data, None);
160        assert_eq!(
161            event_coalescer.state,
162            Some(PeriodData {
163                start: period_start,
164                end: Some(period_end),
165                environment: environment_1,
166            })
167        );
168
169        clock.advance(within_timeout_adjustment);
170
171        // Logging an event within the timeout but with a different environment should start a new period
172        let period_end = clock.utc_now();
173        let environment_2 = "environment_2";
174        let period_data = event_coalescer.log_event(environment_2);
175
176        assert_eq!(period_data, Some((period_start, period_end, environment_1)));
177        assert_eq!(
178            event_coalescer.state,
179            Some(PeriodData {
180                start: period_end,
181                end: None,
182                environment: environment_2,
183            })
184        );
185    }
186
187    #[test]
188    fn test_switching_environment_while_within_timeout() {
189        let clock = Arc::new(FakeSystemClock::new());
190        let environment_1 = "environment_1";
191        let mut event_coalescer = EventCoalescer::new(clock.clone());
192
193        assert_eq!(event_coalescer.state, None);
194
195        let period_start = clock.utc_now();
196        let period_data = event_coalescer.log_event(environment_1);
197
198        assert_eq!(period_data, None);
199        assert_eq!(
200            event_coalescer.state,
201            Some(PeriodData {
202                start: period_start,
203                end: None,
204                environment: environment_1,
205            })
206        );
207
208        let within_timeout_adjustment = COALESCE_TIMEOUT / 2;
209        clock.advance(within_timeout_adjustment);
210        let period_end = clock.utc_now();
211        let environment_2 = "environment_2";
212        let period_data = event_coalescer.log_event(environment_2);
213
214        assert_eq!(period_data, Some((period_start, period_end, environment_1)));
215        assert_eq!(
216            event_coalescer.state,
217            Some(PeriodData {
218                start: period_end,
219                end: None,
220                environment: environment_2,
221            })
222        );
223    }
224
225    // 0                   20                  40                  60
226    // |-------------------|-------------------|-------------------|-------------------
227    // |--------|----------env change
228    //          |-------------------
229    // |period_start       |period_end
230    //                     |new_period_start
231
232    #[test]
233    fn test_switching_environment_while_exceeding_timeout() {
234        let clock = Arc::new(FakeSystemClock::new());
235        let environment_1 = "environment_1";
236        let mut event_coalescer = EventCoalescer::new(clock.clone());
237
238        assert_eq!(event_coalescer.state, None);
239
240        let period_start = clock.utc_now();
241        let period_data = event_coalescer.log_event(environment_1);
242
243        assert_eq!(period_data, None);
244        assert_eq!(
245            event_coalescer.state,
246            Some(PeriodData {
247                start: period_start,
248                end: None,
249                environment: environment_1,
250            })
251        );
252
253        let exceed_timeout_adjustment = COALESCE_TIMEOUT * 2;
254        clock.advance(exceed_timeout_adjustment);
255        let period_end = clock.utc_now();
256        let environment_2 = "environment_2";
257        let period_data = event_coalescer.log_event(environment_2);
258
259        assert_eq!(
260            period_data,
261            Some((
262                period_start,
263                period_start + SIMULATED_DURATION_FOR_SINGLE_EVENT,
264                environment_1
265            ))
266        );
267        assert_eq!(
268            event_coalescer.state,
269            Some(PeriodData {
270                start: period_end,
271                end: None,
272                environment: environment_2,
273            })
274        );
275    }
276
277    // 0                   20                  40                  60
278    // |-------------------|-------------------|-------------------|-------------------
279    // |--------|----------------------------------------env change
280    //          |-------------------|
281    // |period_start                |period_end
282    //                                                   |new_period_start
283}