1use std::time;
2use std::{sync::Arc, time::Instant};
3
4use clock::SystemClock;
5
6const COALESCE_TIMEOUT: time::Duration = time::Duration::from_secs(20);
7const SIMULATED_DURATION_FOR_SINGLE_EVENT: time::Duration = time::Duration::from_millis(1);
8
9#[derive(Debug, PartialEq)]
10struct PeriodData {
11 environment: &'static str,
12 start: Instant,
13 end: Option<Instant>,
14}
15
16pub struct EventCoalescer {
17 clock: Arc<dyn SystemClock>,
18 state: Option<PeriodData>,
19}
20
21impl EventCoalescer {
22 pub fn new(clock: Arc<dyn SystemClock>) -> Self {
23 Self { clock, state: None }
24 }
25
26 pub fn log_event(
27 &mut self,
28 environment: &'static str,
29 ) -> Option<(Instant, Instant, &'static str)> {
30 let log_time = self.clock.utc_now();
31
32 let Some(state) = &mut self.state else {
33 self.state = Some(PeriodData {
34 start: log_time,
35 end: None,
36 environment,
37 });
38 return None;
39 };
40
41 let period_end = state
42 .end
43 .unwrap_or(state.start + SIMULATED_DURATION_FOR_SINGLE_EVENT);
44 let within_timeout = log_time - period_end < COALESCE_TIMEOUT;
45 let environment_is_same = state.environment == environment;
46 let should_coaelesce = !within_timeout || !environment_is_same;
47
48 if should_coaelesce {
49 let previous_environment = state.environment;
50 let original_start = state.start;
51
52 state.start = log_time;
53 state.end = None;
54 state.environment = environment;
55
56 return Some((
57 original_start,
58 if within_timeout { log_time } else { period_end },
59 previous_environment,
60 ));
61 }
62
63 state.end = Some(log_time);
64
65 None
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use clock::FakeSystemClock;
72
73 use super::*;
74
75 #[test]
76 fn test_same_context_exceeding_timeout() {
77 let clock = Arc::new(FakeSystemClock::new());
78 let environment_1 = "environment_1";
79 let mut event_coalescer = EventCoalescer::new(clock.clone());
80
81 assert_eq!(event_coalescer.state, None);
82
83 let period_start = clock.utc_now();
84 let period_data = event_coalescer.log_event(environment_1);
85
86 assert_eq!(period_data, None);
87 assert_eq!(
88 event_coalescer.state,
89 Some(PeriodData {
90 start: period_start,
91 end: None,
92 environment: environment_1,
93 })
94 );
95
96 let within_timeout_adjustment = COALESCE_TIMEOUT / 2;
97
98 // Ensure that many calls within the timeout don't start a new period
99 for _ in 0..100 {
100 clock.advance(within_timeout_adjustment);
101 let period_data = event_coalescer.log_event(environment_1);
102 let period_end = clock.utc_now();
103
104 assert_eq!(period_data, None);
105 assert_eq!(
106 event_coalescer.state,
107 Some(PeriodData {
108 start: period_start,
109 end: Some(period_end),
110 environment: environment_1,
111 })
112 );
113 }
114
115 let period_end = clock.utc_now();
116 let exceed_timeout_adjustment = COALESCE_TIMEOUT * 2;
117 // Logging an event exceeding the timeout should start a new period
118 clock.advance(exceed_timeout_adjustment);
119 let new_period_start = clock.utc_now();
120 let period_data = event_coalescer.log_event(environment_1);
121
122 assert_eq!(period_data, Some((period_start, period_end, environment_1)));
123 assert_eq!(
124 event_coalescer.state,
125 Some(PeriodData {
126 start: new_period_start,
127 end: None,
128 environment: environment_1,
129 })
130 );
131 }
132
133 #[test]
134 fn test_different_environment_under_timeout() {
135 let clock = Arc::new(FakeSystemClock::new());
136 let environment_1 = "environment_1";
137 let mut event_coalescer = EventCoalescer::new(clock.clone());
138
139 assert_eq!(event_coalescer.state, None);
140
141 let period_start = clock.utc_now();
142 let period_data = event_coalescer.log_event(environment_1);
143
144 assert_eq!(period_data, None);
145 assert_eq!(
146 event_coalescer.state,
147 Some(PeriodData {
148 start: period_start,
149 end: None,
150 environment: environment_1,
151 })
152 );
153
154 let within_timeout_adjustment = COALESCE_TIMEOUT / 2;
155 clock.advance(within_timeout_adjustment);
156 let period_end = clock.utc_now();
157 let period_data = event_coalescer.log_event(environment_1);
158
159 assert_eq!(period_data, None);
160 assert_eq!(
161 event_coalescer.state,
162 Some(PeriodData {
163 start: period_start,
164 end: Some(period_end),
165 environment: environment_1,
166 })
167 );
168
169 clock.advance(within_timeout_adjustment);
170
171 // Logging an event within the timeout but with a different environment should start a new period
172 let period_end = clock.utc_now();
173 let environment_2 = "environment_2";
174 let period_data = event_coalescer.log_event(environment_2);
175
176 assert_eq!(period_data, Some((period_start, period_end, environment_1)));
177 assert_eq!(
178 event_coalescer.state,
179 Some(PeriodData {
180 start: period_end,
181 end: None,
182 environment: environment_2,
183 })
184 );
185 }
186
187 #[test]
188 fn test_switching_environment_while_within_timeout() {
189 let clock = Arc::new(FakeSystemClock::new());
190 let environment_1 = "environment_1";
191 let mut event_coalescer = EventCoalescer::new(clock.clone());
192
193 assert_eq!(event_coalescer.state, None);
194
195 let period_start = clock.utc_now();
196 let period_data = event_coalescer.log_event(environment_1);
197
198 assert_eq!(period_data, None);
199 assert_eq!(
200 event_coalescer.state,
201 Some(PeriodData {
202 start: period_start,
203 end: None,
204 environment: environment_1,
205 })
206 );
207
208 let within_timeout_adjustment = COALESCE_TIMEOUT / 2;
209 clock.advance(within_timeout_adjustment);
210 let period_end = clock.utc_now();
211 let environment_2 = "environment_2";
212 let period_data = event_coalescer.log_event(environment_2);
213
214 assert_eq!(period_data, Some((period_start, period_end, environment_1)));
215 assert_eq!(
216 event_coalescer.state,
217 Some(PeriodData {
218 start: period_end,
219 end: None,
220 environment: environment_2,
221 })
222 );
223 }
224
225 // 0 20 40 60
226 // |-------------------|-------------------|-------------------|-------------------
227 // |--------|----------env change
228 // |-------------------
229 // |period_start |period_end
230 // |new_period_start
231
232 #[test]
233 fn test_switching_environment_while_exceeding_timeout() {
234 let clock = Arc::new(FakeSystemClock::new());
235 let environment_1 = "environment_1";
236 let mut event_coalescer = EventCoalescer::new(clock.clone());
237
238 assert_eq!(event_coalescer.state, None);
239
240 let period_start = clock.utc_now();
241 let period_data = event_coalescer.log_event(environment_1);
242
243 assert_eq!(period_data, None);
244 assert_eq!(
245 event_coalescer.state,
246 Some(PeriodData {
247 start: period_start,
248 end: None,
249 environment: environment_1,
250 })
251 );
252
253 let exceed_timeout_adjustment = COALESCE_TIMEOUT * 2;
254 clock.advance(exceed_timeout_adjustment);
255 let period_end = clock.utc_now();
256 let environment_2 = "environment_2";
257 let period_data = event_coalescer.log_event(environment_2);
258
259 assert_eq!(
260 period_data,
261 Some((
262 period_start,
263 period_start + SIMULATED_DURATION_FOR_SINGLE_EVENT,
264 environment_1
265 ))
266 );
267 assert_eq!(
268 event_coalescer.state,
269 Some(PeriodData {
270 start: period_end,
271 end: None,
272 environment: environment_2,
273 })
274 );
275 }
276
277 // 0 20 40 60
278 // |-------------------|-------------------|-------------------|-------------------
279 // |--------|----------------------------------------env change
280 // |-------------------|
281 // |period_start |period_end
282 // |new_period_start
283}