1use std::sync::{Arc, OnceLock};
2
3use anyhow::{anyhow, Context};
4use axum::{
5 body::Bytes, headers::Header, http::HeaderName, routing::post, Extension, Router, TypedHeader,
6};
7use hyper::StatusCode;
8use serde::{Serialize, Serializer};
9use sha2::{Digest, Sha256};
10use telemetry_events::{
11 ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
12 EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
13};
14
15use crate::{AppState, Error, Result};
16
17pub fn router() -> Router {
18 Router::new().route("/telemetry/events", post(post_events))
19}
20
21pub struct ZedChecksumHeader(Vec<u8>);
22
23impl Header for ZedChecksumHeader {
24 fn name() -> &'static HeaderName {
25 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
26 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
27 }
28
29 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
30 where
31 Self: Sized,
32 I: Iterator<Item = &'i axum::http::HeaderValue>,
33 {
34 let checksum = values
35 .next()
36 .ok_or_else(axum::headers::Error::invalid)?
37 .to_str()
38 .map_err(|_| axum::headers::Error::invalid())?;
39
40 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
41 Ok(Self(bytes))
42 }
43
44 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
45 unimplemented!()
46 }
47}
48
49pub struct CloudflareIpCountryHeader(String);
50
51impl Header for CloudflareIpCountryHeader {
52 fn name() -> &'static HeaderName {
53 static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
54 CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
55 }
56
57 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
58 where
59 Self: Sized,
60 I: Iterator<Item = &'i axum::http::HeaderValue>,
61 {
62 let country_code = values
63 .next()
64 .ok_or_else(axum::headers::Error::invalid)?
65 .to_str()
66 .map_err(|_| axum::headers::Error::invalid())?;
67
68 Ok(Self(country_code.to_string()))
69 }
70
71 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
72 unimplemented!()
73 }
74}
75
76pub async fn post_events(
77 Extension(app): Extension<Arc<AppState>>,
78 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
79 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
80 body: Bytes,
81) -> Result<()> {
82 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
83 Err(Error::Http(
84 StatusCode::NOT_IMPLEMENTED,
85 "not supported".into(),
86 ))?
87 };
88
89 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
90 return Err(Error::Http(
91 StatusCode::INTERNAL_SERVER_ERROR,
92 "events not enabled".into(),
93 ))?;
94 };
95
96 let mut summer = Sha256::new();
97 summer.update(checksum_seed);
98 summer.update(&body);
99 summer.update(checksum_seed);
100
101 if &checksum[..] != &summer.finalize()[..] {
102 return Err(Error::Http(
103 StatusCode::BAD_REQUEST,
104 "invalid checksum".into(),
105 ))?;
106 }
107
108 let request_body: telemetry_events::EventRequestBody =
109 serde_json::from_slice(&body).map_err(|err| {
110 log::error!("can't parse event json: {err}");
111 Error::Internal(anyhow!(err))
112 })?;
113
114 let mut to_upload = ToUpload::default();
115 let Some(last_event) = request_body.events.last() else {
116 return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
117 };
118 let country_code = country_code_header.map(|h| h.0 .0);
119
120 let first_event_at = chrono::Utc::now()
121 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
122
123 for wrapper in &request_body.events {
124 match &wrapper.event {
125 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
126 event.clone(),
127 &wrapper,
128 &request_body,
129 first_event_at,
130 country_code.clone(),
131 )),
132 Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
133 event.clone(),
134 &wrapper,
135 &request_body,
136 first_event_at,
137 country_code.clone(),
138 )),
139 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
140 event.clone(),
141 &wrapper,
142 &request_body,
143 first_event_at,
144 )),
145 Event::Assistant(event) => {
146 to_upload
147 .assistant_events
148 .push(AssistantEventRow::from_event(
149 event.clone(),
150 &wrapper,
151 &request_body,
152 first_event_at,
153 ))
154 }
155 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
156 event.clone(),
157 &wrapper,
158 &request_body,
159 first_event_at,
160 )),
161 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
162 event.clone(),
163 &wrapper,
164 &request_body,
165 first_event_at,
166 )),
167 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
168 event.clone(),
169 &wrapper,
170 &request_body,
171 first_event_at,
172 )),
173 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
174 event.clone(),
175 &wrapper,
176 &request_body,
177 first_event_at,
178 )),
179 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
180 event.clone(),
181 &wrapper,
182 &request_body,
183 first_event_at,
184 )),
185 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
186 event.clone(),
187 &wrapper,
188 &request_body,
189 first_event_at,
190 )),
191 }
192 }
193
194 to_upload
195 .upload(&clickhouse_client)
196 .await
197 .map_err(|err| Error::Internal(anyhow!(err)))?;
198
199 Ok(())
200}
201
202#[derive(Default)]
203struct ToUpload {
204 editor_events: Vec<EditorEventRow>,
205 copilot_events: Vec<CopilotEventRow>,
206 assistant_events: Vec<AssistantEventRow>,
207 call_events: Vec<CallEventRow>,
208 cpu_events: Vec<CpuEventRow>,
209 memory_events: Vec<MemoryEventRow>,
210 app_events: Vec<AppEventRow>,
211 setting_events: Vec<SettingEventRow>,
212 edit_events: Vec<EditEventRow>,
213 action_events: Vec<ActionEventRow>,
214}
215
216impl ToUpload {
217 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
218 Self::upload_to_table("editor_events", &self.editor_events, clickhouse_client)
219 .await
220 .with_context(|| format!("failed to upload to table 'editor_events'"))?;
221 Self::upload_to_table("copilot_events", &self.copilot_events, clickhouse_client)
222 .await
223 .with_context(|| format!("failed to upload to table 'copilot_events'"))?;
224 Self::upload_to_table(
225 "assistant_events",
226 &self.assistant_events,
227 clickhouse_client,
228 )
229 .await
230 .with_context(|| format!("failed to upload to table 'assistant_events'"))?;
231 Self::upload_to_table("call_events", &self.call_events, clickhouse_client)
232 .await
233 .with_context(|| format!("failed to upload to table 'call_events'"))?;
234 Self::upload_to_table("cpu_events", &self.cpu_events, clickhouse_client)
235 .await
236 .with_context(|| format!("failed to upload to table 'cpu_events'"))?;
237 Self::upload_to_table("memory_events", &self.memory_events, clickhouse_client)
238 .await
239 .with_context(|| format!("failed to upload to table 'memory_events'"))?;
240 Self::upload_to_table("app_events", &self.app_events, clickhouse_client)
241 .await
242 .with_context(|| format!("failed to upload to table 'app_events'"))?;
243 Self::upload_to_table("setting_events", &self.setting_events, clickhouse_client)
244 .await
245 .with_context(|| format!("failed to upload to table 'setting_events'"))?;
246 Self::upload_to_table("edit_events", &self.edit_events, clickhouse_client)
247 .await
248 .with_context(|| format!("failed to upload to table 'edit_events'"))?;
249 Self::upload_to_table("action_events", &self.action_events, clickhouse_client)
250 .await
251 .with_context(|| format!("failed to upload to table 'action_events'"))?;
252 Ok(())
253 }
254
255 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
256 table: &str,
257 rows: &[T],
258 clickhouse_client: &clickhouse::Client,
259 ) -> anyhow::Result<()> {
260 if !rows.is_empty() {
261 let mut insert = clickhouse_client.insert(table)?;
262
263 for event in rows {
264 insert.write(event).await?;
265 }
266
267 insert.end().await?;
268 }
269
270 Ok(())
271 }
272}
273
274pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
275where
276 S: Serializer,
277{
278 if country_code.len() != 2 {
279 use serde::ser::Error;
280 return Err(S::Error::custom(
281 "country_code must be exactly 2 characters",
282 ));
283 }
284
285 let country_code = country_code.as_bytes();
286
287 serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
288}
289
290#[derive(Serialize, Debug, clickhouse::Row)]
291pub struct EditorEventRow {
292 pub installation_id: String,
293 pub operation: String,
294 pub app_version: String,
295 pub file_extension: String,
296 pub os_name: String,
297 pub os_version: String,
298 pub release_channel: String,
299 pub signed_in: bool,
300 pub vim_mode: bool,
301 #[serde(serialize_with = "serialize_country_code")]
302 pub country_code: String,
303 pub region_code: String,
304 pub city: String,
305 pub time: i64,
306 pub copilot_enabled: bool,
307 pub copilot_enabled_for_language: bool,
308 pub historical_event: bool,
309 pub architecture: String,
310 pub is_staff: Option<bool>,
311 pub session_id: Option<String>,
312 pub major: Option<i32>,
313 pub minor: Option<i32>,
314 pub patch: Option<i32>,
315}
316
317impl EditorEventRow {
318 fn from_event(
319 event: EditorEvent,
320 wrapper: &EventWrapper,
321 body: &EventRequestBody,
322 first_event_at: chrono::DateTime<chrono::Utc>,
323 country_code: Option<String>,
324 ) -> Self {
325 let semver = body.semver();
326 let time =
327 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
328
329 Self {
330 app_version: body.app_version.clone(),
331 major: semver.map(|s| s.major as i32),
332 minor: semver.map(|s| s.minor as i32),
333 patch: semver.map(|s| s.patch as i32),
334 release_channel: body.release_channel.clone().unwrap_or_default(),
335 os_name: body.os_name.clone(),
336 os_version: body.os_version.clone().unwrap_or_default(),
337 architecture: body.architecture.clone(),
338 installation_id: body.installation_id.clone().unwrap_or_default(),
339 session_id: body.session_id.clone(),
340 is_staff: body.is_staff,
341 time: time.timestamp_millis(),
342 operation: event.operation,
343 file_extension: event.file_extension.unwrap_or_default(),
344 signed_in: wrapper.signed_in,
345 vim_mode: event.vim_mode,
346 copilot_enabled: event.copilot_enabled,
347 copilot_enabled_for_language: event.copilot_enabled_for_language,
348 country_code: country_code.unwrap_or("XX".to_string()),
349 region_code: "".to_string(),
350 city: "".to_string(),
351 historical_event: false,
352 }
353 }
354}
355
356#[derive(Serialize, Debug, clickhouse::Row)]
357pub struct CopilotEventRow {
358 pub installation_id: String,
359 pub suggestion_id: String,
360 pub suggestion_accepted: bool,
361 pub app_version: String,
362 pub file_extension: String,
363 pub os_name: String,
364 pub os_version: String,
365 pub release_channel: String,
366 pub signed_in: bool,
367 #[serde(serialize_with = "serialize_country_code")]
368 pub country_code: String,
369 pub region_code: String,
370 pub city: String,
371 pub time: i64,
372 pub is_staff: Option<bool>,
373 pub session_id: Option<String>,
374 pub major: Option<i32>,
375 pub minor: Option<i32>,
376 pub patch: Option<i32>,
377}
378
379impl CopilotEventRow {
380 fn from_event(
381 event: CopilotEvent,
382 wrapper: &EventWrapper,
383 body: &EventRequestBody,
384 first_event_at: chrono::DateTime<chrono::Utc>,
385 country_code: Option<String>,
386 ) -> Self {
387 let semver = body.semver();
388 let time =
389 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
390
391 Self {
392 app_version: body.app_version.clone(),
393 major: semver.map(|s| s.major as i32),
394 minor: semver.map(|s| s.minor as i32),
395 patch: semver.map(|s| s.patch as i32),
396 release_channel: body.release_channel.clone().unwrap_or_default(),
397 os_name: body.os_name.clone(),
398 os_version: body.os_version.clone().unwrap_or_default(),
399 installation_id: body.installation_id.clone().unwrap_or_default(),
400 session_id: body.session_id.clone(),
401 is_staff: body.is_staff,
402 time: time.timestamp_millis(),
403 file_extension: event.file_extension.unwrap_or_default(),
404 signed_in: wrapper.signed_in,
405 country_code: country_code.unwrap_or("XX".to_string()),
406 region_code: "".to_string(),
407 city: "".to_string(),
408 suggestion_id: event.suggestion_id.unwrap_or_default(),
409 suggestion_accepted: event.suggestion_accepted,
410 }
411 }
412}
413
414#[derive(Serialize, Debug, clickhouse::Row)]
415pub struct CallEventRow {
416 // AppInfoBase
417 app_version: String,
418 major: Option<i32>,
419 minor: Option<i32>,
420 patch: Option<i32>,
421 release_channel: String,
422
423 // ClientEventBase
424 installation_id: String,
425 session_id: Option<String>,
426 is_staff: Option<bool>,
427 time: i64,
428
429 // CallEventRow
430 operation: String,
431 room_id: Option<u64>,
432 channel_id: Option<u64>,
433}
434
435impl CallEventRow {
436 fn from_event(
437 event: CallEvent,
438 wrapper: &EventWrapper,
439 body: &EventRequestBody,
440 first_event_at: chrono::DateTime<chrono::Utc>,
441 ) -> Self {
442 let semver = body.semver();
443 let time =
444 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
445
446 Self {
447 app_version: body.app_version.clone(),
448 major: semver.map(|s| s.major as i32),
449 minor: semver.map(|s| s.minor as i32),
450 patch: semver.map(|s| s.patch as i32),
451 release_channel: body.release_channel.clone().unwrap_or_default(),
452 installation_id: body.installation_id.clone().unwrap_or_default(),
453 session_id: body.session_id.clone(),
454 is_staff: body.is_staff,
455 time: time.timestamp_millis(),
456 operation: event.operation,
457 room_id: event.room_id,
458 channel_id: event.channel_id,
459 }
460 }
461}
462
463#[derive(Serialize, Debug, clickhouse::Row)]
464pub struct AssistantEventRow {
465 // AppInfoBase
466 app_version: String,
467 major: Option<i32>,
468 minor: Option<i32>,
469 patch: Option<i32>,
470 release_channel: String,
471
472 // ClientEventBase
473 installation_id: Option<String>,
474 session_id: Option<String>,
475 is_staff: Option<bool>,
476 time: i64,
477
478 // AssistantEventRow
479 conversation_id: String,
480 kind: String,
481 model: String,
482}
483
484impl AssistantEventRow {
485 fn from_event(
486 event: AssistantEvent,
487 wrapper: &EventWrapper,
488 body: &EventRequestBody,
489 first_event_at: chrono::DateTime<chrono::Utc>,
490 ) -> Self {
491 let semver = body.semver();
492 let time =
493 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
494
495 Self {
496 app_version: body.app_version.clone(),
497 major: semver.map(|s| s.major as i32),
498 minor: semver.map(|s| s.minor as i32),
499 patch: semver.map(|s| s.patch as i32),
500 release_channel: body.release_channel.clone().unwrap_or_default(),
501 installation_id: body.installation_id.clone(),
502 session_id: body.session_id.clone(),
503 is_staff: body.is_staff,
504 time: time.timestamp_millis(),
505 conversation_id: event.conversation_id.unwrap_or_default(),
506 kind: event.kind.to_string(),
507 model: event.model,
508 }
509 }
510}
511
512#[derive(Debug, clickhouse::Row, Serialize)]
513pub struct CpuEventRow {
514 pub installation_id: Option<String>,
515 pub is_staff: Option<bool>,
516 pub usage_as_percentage: f32,
517 pub core_count: u32,
518 pub app_version: String,
519 pub release_channel: String,
520 pub time: i64,
521 pub session_id: Option<String>,
522 // pub normalized_cpu_usage: f64, MATERIALIZED
523 pub major: Option<i32>,
524 pub minor: Option<i32>,
525 pub patch: Option<i32>,
526}
527
528impl CpuEventRow {
529 fn from_event(
530 event: CpuEvent,
531 wrapper: &EventWrapper,
532 body: &EventRequestBody,
533 first_event_at: chrono::DateTime<chrono::Utc>,
534 ) -> Self {
535 let semver = body.semver();
536 let time =
537 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
538
539 Self {
540 app_version: body.app_version.clone(),
541 major: semver.map(|s| s.major as i32),
542 minor: semver.map(|s| s.minor as i32),
543 patch: semver.map(|s| s.patch as i32),
544 release_channel: body.release_channel.clone().unwrap_or_default(),
545 installation_id: body.installation_id.clone(),
546 session_id: body.session_id.clone(),
547 is_staff: body.is_staff,
548 time: time.timestamp_millis(),
549 usage_as_percentage: event.usage_as_percentage,
550 core_count: event.core_count,
551 }
552 }
553}
554
555#[derive(Serialize, Debug, clickhouse::Row)]
556pub struct MemoryEventRow {
557 // AppInfoBase
558 app_version: String,
559 major: Option<i32>,
560 minor: Option<i32>,
561 patch: Option<i32>,
562 release_channel: String,
563
564 // ClientEventBase
565 installation_id: Option<String>,
566 session_id: Option<String>,
567 is_staff: Option<bool>,
568 time: i64,
569
570 // MemoryEventRow
571 memory_in_bytes: u64,
572 virtual_memory_in_bytes: u64,
573}
574
575impl MemoryEventRow {
576 fn from_event(
577 event: MemoryEvent,
578 wrapper: &EventWrapper,
579 body: &EventRequestBody,
580 first_event_at: chrono::DateTime<chrono::Utc>,
581 ) -> Self {
582 let semver = body.semver();
583 let time =
584 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
585
586 Self {
587 app_version: body.app_version.clone(),
588 major: semver.map(|s| s.major as i32),
589 minor: semver.map(|s| s.minor as i32),
590 patch: semver.map(|s| s.patch as i32),
591 release_channel: body.release_channel.clone().unwrap_or_default(),
592 installation_id: body.installation_id.clone(),
593 session_id: body.session_id.clone(),
594 is_staff: body.is_staff,
595 time: time.timestamp_millis(),
596 memory_in_bytes: event.memory_in_bytes,
597 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
598 }
599 }
600}
601
602#[derive(Serialize, Debug, clickhouse::Row)]
603pub struct AppEventRow {
604 // AppInfoBase
605 app_version: String,
606 major: Option<i32>,
607 minor: Option<i32>,
608 patch: Option<i32>,
609 release_channel: String,
610
611 // ClientEventBase
612 installation_id: Option<String>,
613 session_id: Option<String>,
614 is_staff: Option<bool>,
615 time: i64,
616
617 // AppEventRow
618 operation: String,
619}
620
621impl AppEventRow {
622 fn from_event(
623 event: AppEvent,
624 wrapper: &EventWrapper,
625 body: &EventRequestBody,
626 first_event_at: chrono::DateTime<chrono::Utc>,
627 ) -> Self {
628 let semver = body.semver();
629 let time =
630 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
631
632 Self {
633 app_version: body.app_version.clone(),
634 major: semver.map(|s| s.major as i32),
635 minor: semver.map(|s| s.minor as i32),
636 patch: semver.map(|s| s.patch as i32),
637 release_channel: body.release_channel.clone().unwrap_or_default(),
638 installation_id: body.installation_id.clone(),
639 session_id: body.session_id.clone(),
640 is_staff: body.is_staff,
641 time: time.timestamp_millis(),
642 operation: event.operation,
643 }
644 }
645}
646
647#[derive(Serialize, Debug, clickhouse::Row)]
648pub struct SettingEventRow {
649 // AppInfoBase
650 app_version: String,
651 major: Option<i32>,
652 minor: Option<i32>,
653 patch: Option<i32>,
654 release_channel: String,
655
656 // ClientEventBase
657 installation_id: Option<String>,
658 session_id: Option<String>,
659 is_staff: Option<bool>,
660 time: i64,
661 // SettingEventRow
662 setting: String,
663 value: String,
664}
665
666impl SettingEventRow {
667 fn from_event(
668 event: SettingEvent,
669 wrapper: &EventWrapper,
670 body: &EventRequestBody,
671 first_event_at: chrono::DateTime<chrono::Utc>,
672 ) -> Self {
673 let semver = body.semver();
674 let time =
675 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
676
677 Self {
678 app_version: body.app_version.clone(),
679 major: semver.map(|s| s.major as i32),
680 minor: semver.map(|s| s.minor as i32),
681 patch: semver.map(|s| s.patch as i32),
682 release_channel: body.release_channel.clone().unwrap_or_default(),
683 installation_id: body.installation_id.clone(),
684 session_id: body.session_id.clone(),
685 is_staff: body.is_staff,
686 time: time.timestamp_millis(),
687 setting: event.setting,
688 value: event.value,
689 }
690 }
691}
692
693#[derive(Serialize, Debug, clickhouse::Row)]
694pub struct EditEventRow {
695 // AppInfoBase
696 app_version: String,
697 major: Option<i32>,
698 minor: Option<i32>,
699 patch: Option<i32>,
700 release_channel: String,
701
702 // ClientEventBase
703 installation_id: Option<String>,
704 // Note: This column name has a typo in the ClickHouse table.
705 #[serde(rename = "sesssion_id")]
706 session_id: Option<String>,
707 is_staff: Option<bool>,
708 time: i64,
709
710 // EditEventRow
711 period_start: i64,
712 period_end: i64,
713 environment: String,
714}
715
716impl EditEventRow {
717 fn from_event(
718 event: EditEvent,
719 wrapper: &EventWrapper,
720 body: &EventRequestBody,
721 first_event_at: chrono::DateTime<chrono::Utc>,
722 ) -> Self {
723 let semver = body.semver();
724 let time =
725 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
726
727 let period_start = time - chrono::Duration::milliseconds(event.duration);
728 let period_end = time;
729
730 Self {
731 app_version: body.app_version.clone(),
732 major: semver.map(|s| s.major as i32),
733 minor: semver.map(|s| s.minor as i32),
734 patch: semver.map(|s| s.patch as i32),
735 release_channel: body.release_channel.clone().unwrap_or_default(),
736 installation_id: body.installation_id.clone(),
737 session_id: body.session_id.clone(),
738 is_staff: body.is_staff,
739 time: time.timestamp_millis(),
740 period_start: period_start.timestamp_millis(),
741 period_end: period_end.timestamp_millis(),
742 environment: event.environment,
743 }
744 }
745}
746
747#[derive(Serialize, Debug, clickhouse::Row)]
748pub struct ActionEventRow {
749 // AppInfoBase
750 app_version: String,
751 major: Option<i32>,
752 minor: Option<i32>,
753 patch: Option<i32>,
754 release_channel: String,
755
756 // ClientEventBase
757 installation_id: Option<String>,
758 // Note: This column name has a typo in the ClickHouse table.
759 #[serde(rename = "sesssion_id")]
760 session_id: Option<String>,
761 is_staff: Option<bool>,
762 time: i64,
763 // ActionEventRow
764 source: String,
765 action: String,
766}
767
768impl ActionEventRow {
769 fn from_event(
770 event: ActionEvent,
771 wrapper: &EventWrapper,
772 body: &EventRequestBody,
773 first_event_at: chrono::DateTime<chrono::Utc>,
774 ) -> Self {
775 let semver = body.semver();
776 let time =
777 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
778
779 Self {
780 app_version: body.app_version.clone(),
781 major: semver.map(|s| s.major as i32),
782 minor: semver.map(|s| s.minor as i32),
783 patch: semver.map(|s| s.patch as i32),
784 release_channel: body.release_channel.clone().unwrap_or_default(),
785 installation_id: body.installation_id.clone(),
786 session_id: body.session_id.clone(),
787 is_staff: body.is_staff,
788 time: time.timestamp_millis(),
789 source: event.source,
790 action: event.action,
791 }
792 }
793}