1use chrono::{DateTime, Duration, Utc};
2use std::time;
3
4const COALESCE_TIMEOUT: time::Duration = time::Duration::from_secs(20);
5const SIMULATED_DURATION_FOR_SINGLE_EVENT: time::Duration = time::Duration::from_millis(1);
6
7#[derive(Debug, PartialEq)]
8struct PeriodData {
9 environment: &'static str,
10 start: DateTime<Utc>,
11 end: Option<DateTime<Utc>>,
12}
13
14pub struct EventCoalescer {
15 state: Option<PeriodData>,
16}
17
18impl EventCoalescer {
19 pub fn new() -> Self {
20 Self { state: None }
21 }
22
23 pub fn log_event(
24 &mut self,
25 environment: &'static str,
26 ) -> Option<(DateTime<Utc>, DateTime<Utc>, &'static str)> {
27 self.log_event_with_time(Utc::now(), environment)
28 }
29
30 // pub fn close_current_period(&mut self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
31 // self.environment.map(|env| self.log_event(env)).flatten()
32 // }
33
34 fn log_event_with_time(
35 &mut self,
36 log_time: DateTime<Utc>,
37 environment: &'static str,
38 ) -> Option<(DateTime<Utc>, DateTime<Utc>, &'static str)> {
39 let coalesce_timeout = Duration::from_std(COALESCE_TIMEOUT).unwrap();
40
41 let Some(state) = &mut self.state else {
42 self.state = Some(PeriodData {
43 start: log_time,
44 end: None,
45 environment,
46 });
47 return None;
48 };
49
50 let period_end = state
51 .end
52 .unwrap_or(state.start + SIMULATED_DURATION_FOR_SINGLE_EVENT);
53 let within_timeout = log_time - period_end < coalesce_timeout;
54 let environment_is_same = state.environment == environment;
55 let should_coaelesce = !within_timeout || !environment_is_same;
56
57 if should_coaelesce {
58 let previous_environment = state.environment;
59 let original_start = state.start;
60
61 state.start = log_time;
62 state.end = None;
63 state.environment = environment;
64
65 return Some((
66 original_start,
67 if within_timeout { log_time } else { period_end },
68 previous_environment,
69 ));
70 }
71
72 state.end = Some(log_time);
73
74 None
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use chrono::TimeZone;
81
82 use super::*;
83
84 #[test]
85 fn test_same_context_exceeding_timeout() {
86 let environment_1 = "environment_1";
87 let mut event_coalescer = EventCoalescer::new();
88
89 assert_eq!(event_coalescer.state, None);
90
91 let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
92 let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
93
94 assert_eq!(period_data, None);
95 assert_eq!(
96 event_coalescer.state,
97 Some(PeriodData {
98 start: period_start,
99 end: None,
100 environment: environment_1,
101 })
102 );
103
104 let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
105 let mut period_end = period_start;
106
107 // Ensure that many calls within the timeout don't start a new period
108 for _ in 0..100 {
109 period_end += within_timeout_adjustment;
110 let period_data = event_coalescer.log_event_with_time(period_end, environment_1);
111
112 assert_eq!(period_data, None);
113 assert_eq!(
114 event_coalescer.state,
115 Some(PeriodData {
116 start: period_start,
117 end: Some(period_end),
118 environment: environment_1,
119 })
120 );
121 }
122
123 let exceed_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT * 2).unwrap();
124 // Logging an event exceeding the timeout should start a new period
125 let new_period_start = period_end + exceed_timeout_adjustment;
126 let period_data = event_coalescer.log_event_with_time(new_period_start, environment_1);
127
128 assert_eq!(period_data, Some((period_start, period_end, environment_1)));
129 assert_eq!(
130 event_coalescer.state,
131 Some(PeriodData {
132 start: new_period_start,
133 end: None,
134 environment: environment_1,
135 })
136 );
137 }
138
139 #[test]
140 fn test_different_environment_under_timeout() {
141 let environment_1 = "environment_1";
142 let mut event_coalescer = EventCoalescer::new();
143
144 assert_eq!(event_coalescer.state, None);
145
146 let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
147 let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
148
149 assert_eq!(period_data, None);
150 assert_eq!(
151 event_coalescer.state,
152 Some(PeriodData {
153 start: period_start,
154 end: None,
155 environment: environment_1,
156 })
157 );
158
159 let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
160 let period_end = period_start + within_timeout_adjustment;
161 let period_data = event_coalescer.log_event_with_time(period_end, environment_1);
162
163 assert_eq!(period_data, None);
164 assert_eq!(
165 event_coalescer.state,
166 Some(PeriodData {
167 start: period_start,
168 end: Some(period_end),
169 environment: environment_1,
170 })
171 );
172
173 // Logging an event within the timeout but with a different environment should start a new period
174 let period_end = period_end + within_timeout_adjustment;
175 let environment_2 = "environment_2";
176 let period_data = event_coalescer.log_event_with_time(period_end, environment_2);
177
178 assert_eq!(period_data, Some((period_start, period_end, environment_1)));
179 assert_eq!(
180 event_coalescer.state,
181 Some(PeriodData {
182 start: period_end,
183 end: None,
184 environment: environment_2,
185 })
186 );
187 }
188
189 #[test]
190 fn test_switching_environment_while_within_timeout() {
191 let environment_1 = "environment_1";
192 let mut event_coalescer = EventCoalescer::new();
193
194 assert_eq!(event_coalescer.state, None);
195
196 let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
197 let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
198
199 assert_eq!(period_data, None);
200 assert_eq!(
201 event_coalescer.state,
202 Some(PeriodData {
203 start: period_start,
204 end: None,
205 environment: environment_1,
206 })
207 );
208
209 let within_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT / 2).unwrap();
210 let period_end = period_start + within_timeout_adjustment;
211 let environment_2 = "environment_2";
212 let period_data = event_coalescer.log_event_with_time(period_end, environment_2);
213
214 assert_eq!(period_data, Some((period_start, period_end, environment_1)));
215 assert_eq!(
216 event_coalescer.state,
217 Some(PeriodData {
218 start: period_end,
219 end: None,
220 environment: environment_2,
221 })
222 );
223 }
224 // // 0 20 40 60
225 // // |-------------------|-------------------|-------------------|-------------------
226 // // |--------|----------env change
227 // // |-------------------
228 // // |period_start |period_end
229 // // |new_period_start
230
231 #[test]
232 fn test_switching_environment_while_exceeding_timeout() {
233 let environment_1 = "environment_1";
234 let mut event_coalescer = EventCoalescer::new();
235
236 assert_eq!(event_coalescer.state, None);
237
238 let period_start = Utc.with_ymd_and_hms(1990, 4, 12, 0, 0, 0).unwrap();
239 let period_data = event_coalescer.log_event_with_time(period_start, environment_1);
240
241 assert_eq!(period_data, None);
242 assert_eq!(
243 event_coalescer.state,
244 Some(PeriodData {
245 start: period_start,
246 end: None,
247 environment: environment_1,
248 })
249 );
250
251 let exceed_timeout_adjustment = Duration::from_std(COALESCE_TIMEOUT * 2).unwrap();
252 let period_end = period_start + exceed_timeout_adjustment;
253 let environment_2 = "environment_2";
254 let period_data = event_coalescer.log_event_with_time(period_end, environment_2);
255
256 assert_eq!(
257 period_data,
258 Some((
259 period_start,
260 period_start + SIMULATED_DURATION_FOR_SINGLE_EVENT,
261 environment_1
262 ))
263 );
264 assert_eq!(
265 event_coalescer.state,
266 Some(PeriodData {
267 start: period_end,
268 end: None,
269 environment: environment_2,
270 })
271 );
272 }
273 // 0 20 40 60
274 // |-------------------|-------------------|-------------------|-------------------
275 // |--------|----------------------------------------env change
276 // |-------------------|
277 // |period_start |period_end
278 // |new_period_start
279}