1use std::sync::Arc;
2use std::time;
3
4use chrono::{DateTime, Duration, Utc};
5use clock::SystemClock;
6
7const COALESCE_TIMEOUT: time::Duration = time::Duration::from_secs(20);
8const SIMULATED_DURATION_FOR_SINGLE_EVENT: time::Duration = time::Duration::from_millis(1);
9
10#[derive(Debug, PartialEq)]
11struct PeriodData {
12 environment: &'static str,
13 start: DateTime<Utc>,
14 end: Option<DateTime<Utc>>,
15}
16
17pub struct EventCoalescer {
18 clock: Arc<dyn SystemClock>,
19 state: Option<PeriodData>,
20}
21
22impl EventCoalescer {
23 pub fn new(clock: Arc<dyn SystemClock>) -> Self {
24 Self { clock, state: None }
25 }
26
27 pub fn log_event(
28 &mut self,
29 environment: &'static str,
30 ) -> Option<(DateTime<Utc>, DateTime<Utc>, &'static str)> {
31 let log_time = self.clock.utc_now();
32 let coalesce_timeout = Duration::from_std(COALESCE_TIMEOUT).unwrap();
33
34 let Some(state) = &mut self.state else {
35 self.state = Some(PeriodData {
36 start: log_time,
37 end: None,
38 environment,
39 });
40 return None;
41 };
42
43 let period_end = state
44 .end
45 .unwrap_or(state.start + SIMULATED_DURATION_FOR_SINGLE_EVENT);
46 let within_timeout = log_time - period_end < coalesce_timeout;
47 let environment_is_same = state.environment == environment;
48 let should_coaelesce = !within_timeout || !environment_is_same;
49
50 if should_coaelesce {
51 let previous_environment = state.environment;
52 let original_start = state.start;
53
54 state.start = log_time;
55 state.end = None;
56 state.environment = environment;
57
58 return Some((
59 original_start,
60 if within_timeout { log_time } else { period_end },
61 previous_environment,
62 ));
63 }
64
65 state.end = Some(log_time);
66
67 None
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use chrono::TimeZone;
74 use clock::FakeSystemClock;
75
76 use super::*;
77
78 #[test]
79 fn test_same_context_exceeding_timeout() {
80 let clock = Arc::new(FakeSystemClock::new(
81 Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap(),
82 ));
83 let environment_1 = "environment_1";
84 let mut event_coalescer = EventCoalescer::new(clock.clone());
85
86 assert_eq!(event_coalescer.state, None);
87
88 let period_start = clock.utc_now();
89 let period_data = event_coalescer.log_event(environment_1);
90
91 assert_eq!(period_data, None);
92 assert_eq!(
93 event_coalescer.state,
94 Some(PeriodData {
95 start: period_start,
96 end: None,
97 environment: environment_1,
98 })
99 );
100
101 let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
102
103 // Ensure that many calls within the timeout don't start a new period
104 for _ in 0..100 {
105 clock.advance(within_timeout_adjustment);
106 let period_data = event_coalescer.log_event(environment_1);
107 let period_end = clock.utc_now();
108
109 assert_eq!(period_data, None);
110 assert_eq!(
111 event_coalescer.state,
112 Some(PeriodData {
113 start: period_start,
114 end: Some(period_end),
115 environment: environment_1,
116 })
117 );
118 }
119
120 let period_end = clock.utc_now();
121 let exceed_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT * 2).unwrap();
122 // Logging an event exceeding the timeout should start a new period
123 clock.advance(exceed_timeout_adjustment);
124 let new_period_start = clock.utc_now();
125 let period_data = event_coalescer.log_event(environment_1);
126
127 assert_eq!(period_data, Some((period_start, period_end, environment_1)));
128 assert_eq!(
129 event_coalescer.state,
130 Some(PeriodData {
131 start: new_period_start,
132 end: None,
133 environment: environment_1,
134 })
135 );
136 }
137
138 #[test]
139 fn test_different_environment_under_timeout() {
140 let clock = Arc::new(FakeSystemClock::new(
141 Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap(),
142 ));
143 let environment_1 = "environment_1";
144 let mut event_coalescer = EventCoalescer::new(clock.clone());
145
146 assert_eq!(event_coalescer.state, None);
147
148 let period_start = clock.utc_now();
149 let period_data = event_coalescer.log_event(environment_1);
150
151 assert_eq!(period_data, None);
152 assert_eq!(
153 event_coalescer.state,
154 Some(PeriodData {
155 start: period_start,
156 end: None,
157 environment: environment_1,
158 })
159 );
160
161 let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
162 clock.advance(within_timeout_adjustment);
163 let period_end = clock.utc_now();
164 let period_data = event_coalescer.log_event(environment_1);
165
166 assert_eq!(period_data, None);
167 assert_eq!(
168 event_coalescer.state,
169 Some(PeriodData {
170 start: period_start,
171 end: Some(period_end),
172 environment: environment_1,
173 })
174 );
175
176 clock.advance(within_timeout_adjustment);
177
178 // Logging an event within the timeout but with a different environment should start a new period
179 let period_end = clock.utc_now();
180 let environment_2 = "environment_2";
181 let period_data = event_coalescer.log_event(environment_2);
182
183 assert_eq!(period_data, Some((period_start, period_end, environment_1)));
184 assert_eq!(
185 event_coalescer.state,
186 Some(PeriodData {
187 start: period_end,
188 end: None,
189 environment: environment_2,
190 })
191 );
192 }
193
194 #[test]
195 fn test_switching_environment_while_within_timeout() {
196 let clock = Arc::new(FakeSystemClock::new(
197 Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap(),
198 ));
199 let environment_1 = "environment_1";
200 let mut event_coalescer = EventCoalescer::new(clock.clone());
201
202 assert_eq!(event_coalescer.state, None);
203
204 let period_start = clock.utc_now();
205 let period_data = event_coalescer.log_event(environment_1);
206
207 assert_eq!(period_data, None);
208 assert_eq!(
209 event_coalescer.state,
210 Some(PeriodData {
211 start: period_start,
212 end: None,
213 environment: environment_1,
214 })
215 );
216
217 let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
218 clock.advance(within_timeout_adjustment);
219 let period_end = clock.utc_now();
220 let environment_2 = "environment_2";
221 let period_data = event_coalescer.log_event(environment_2);
222
223 assert_eq!(period_data, Some((period_start, period_end, environment_1)));
224 assert_eq!(
225 event_coalescer.state,
226 Some(PeriodData {
227 start: period_end,
228 end: None,
229 environment: environment_2,
230 })
231 );
232 }
233
234 // 0 20 40 60
235 // |-------------------|-------------------|-------------------|-------------------
236 // |--------|----------env change
237 // |-------------------
238 // |period_start |period_end
239 // |new_period_start
240
241 #[test]
242 fn test_switching_environment_while_exceeding_timeout() {
243 let clock = Arc::new(FakeSystemClock::new(
244 Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap(),
245 ));
246 let environment_1 = "environment_1";
247 let mut event_coalescer = EventCoalescer::new(clock.clone());
248
249 assert_eq!(event_coalescer.state, None);
250
251 let period_start = clock.utc_now();
252 let period_data = event_coalescer.log_event(environment_1);
253
254 assert_eq!(period_data, None);
255 assert_eq!(
256 event_coalescer.state,
257 Some(PeriodData {
258 start: period_start,
259 end: None,
260 environment: environment_1,
261 })
262 );
263
264 let exceed_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT * 2).unwrap();
265 clock.advance(exceed_timeout_adjustment);
266 let period_end = clock.utc_now();
267 let environment_2 = "environment_2";
268 let period_data = event_coalescer.log_event(environment_2);
269
270 assert_eq!(
271 period_data,
272 Some((
273 period_start,
274 period_start + SIMULATED_DURATION_FOR_SINGLE_EVENT,
275 environment_1
276 ))
277 );
278 assert_eq!(
279 event_coalescer.state,
280 Some(PeriodData {
281 start: period_end,
282 end: None,
283 environment: environment_2,
284 })
285 );
286 }
287
288 // 0 20 40 60
289 // |-------------------|-------------------|-------------------|-------------------
290 // |--------|----------------------------------------env change
291 // |-------------------|
292 // |period_start |period_end
293 // |new_period_start
294}