1use std::sync::Arc;
2
3use anyhow::{anyhow, Context};
4use axum::{
5 body::Bytes, headers::Header, http::HeaderName, routing::post, Extension, Router, TypedHeader,
6};
7use hyper::StatusCode;
8use lazy_static::lazy_static;
9use serde::{Serialize, Serializer};
10use sha2::{Digest, Sha256};
11use telemetry_events::{
12 ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
13 EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
14};
15
16use crate::{AppState, Error, Result};
17
18pub fn router() -> Router {
19 Router::new().route("/telemetry/events", post(post_events))
20}
21
22lazy_static! {
23 static ref ZED_CHECKSUM_HEADER: HeaderName = HeaderName::from_static("x-zed-checksum");
24 static ref CLOUDFLARE_IP_COUNTRY_HEADER: HeaderName = HeaderName::from_static("cf-ipcountry");
25}
26
27pub struct ZedChecksumHeader(Vec<u8>);
28
29impl Header for ZedChecksumHeader {
30 fn name() -> &'static HeaderName {
31 &ZED_CHECKSUM_HEADER
32 }
33
34 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
35 where
36 Self: Sized,
37 I: Iterator<Item = &'i axum::http::HeaderValue>,
38 {
39 let checksum = values
40 .next()
41 .ok_or_else(axum::headers::Error::invalid)?
42 .to_str()
43 .map_err(|_| axum::headers::Error::invalid())?;
44
45 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
46 Ok(Self(bytes))
47 }
48
49 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
50 unimplemented!()
51 }
52}
53
54pub struct CloudflareIpCountryHeader(String);
55
56impl Header for CloudflareIpCountryHeader {
57 fn name() -> &'static HeaderName {
58 &CLOUDFLARE_IP_COUNTRY_HEADER
59 }
60
61 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
62 where
63 Self: Sized,
64 I: Iterator<Item = &'i axum::http::HeaderValue>,
65 {
66 let country_code = values
67 .next()
68 .ok_or_else(axum::headers::Error::invalid)?
69 .to_str()
70 .map_err(|_| axum::headers::Error::invalid())?;
71
72 Ok(Self(country_code.to_string()))
73 }
74
75 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
76 unimplemented!()
77 }
78}
79
80pub async fn post_events(
81 Extension(app): Extension<Arc<AppState>>,
82 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
83 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
84 body: Bytes,
85) -> Result<()> {
86 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
87 Err(Error::Http(
88 StatusCode::NOT_IMPLEMENTED,
89 "not supported".into(),
90 ))?
91 };
92
93 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
94 return Err(Error::Http(
95 StatusCode::INTERNAL_SERVER_ERROR,
96 "events not enabled".into(),
97 ))?;
98 };
99
100 let mut summer = Sha256::new();
101 summer.update(checksum_seed);
102 summer.update(&body);
103 summer.update(checksum_seed);
104
105 if &checksum[..] != &summer.finalize()[..] {
106 return Err(Error::Http(
107 StatusCode::BAD_REQUEST,
108 "invalid checksum".into(),
109 ))?;
110 }
111
112 let request_body: telemetry_events::EventRequestBody =
113 serde_json::from_slice(&body).map_err(|err| {
114 log::error!("can't parse event json: {err}");
115 Error::Internal(anyhow!(err))
116 })?;
117
118 let mut to_upload = ToUpload::default();
119 let Some(last_event) = request_body.events.last() else {
120 return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
121 };
122 let country_code = country_code_header.map(|h| h.0 .0);
123
124 let first_event_at = chrono::Utc::now()
125 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
126
127 for wrapper in &request_body.events {
128 match &wrapper.event {
129 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
130 event.clone(),
131 &wrapper,
132 &request_body,
133 first_event_at,
134 country_code.clone(),
135 )),
136 Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
137 event.clone(),
138 &wrapper,
139 &request_body,
140 first_event_at,
141 country_code.clone(),
142 )),
143 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
144 event.clone(),
145 &wrapper,
146 &request_body,
147 first_event_at,
148 )),
149 Event::Assistant(event) => {
150 to_upload
151 .assistant_events
152 .push(AssistantEventRow::from_event(
153 event.clone(),
154 &wrapper,
155 &request_body,
156 first_event_at,
157 ))
158 }
159 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
160 event.clone(),
161 &wrapper,
162 &request_body,
163 first_event_at,
164 )),
165 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
166 event.clone(),
167 &wrapper,
168 &request_body,
169 first_event_at,
170 )),
171 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
172 event.clone(),
173 &wrapper,
174 &request_body,
175 first_event_at,
176 )),
177 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
178 event.clone(),
179 &wrapper,
180 &request_body,
181 first_event_at,
182 )),
183 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
184 event.clone(),
185 &wrapper,
186 &request_body,
187 first_event_at,
188 )),
189 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
190 event.clone(),
191 &wrapper,
192 &request_body,
193 first_event_at,
194 )),
195 }
196 }
197
198 to_upload
199 .upload(&clickhouse_client)
200 .await
201 .map_err(|err| Error::Internal(anyhow!(err)))?;
202
203 Ok(())
204}
205
206#[derive(Default)]
207struct ToUpload {
208 editor_events: Vec<EditorEventRow>,
209 copilot_events: Vec<CopilotEventRow>,
210 assistant_events: Vec<AssistantEventRow>,
211 call_events: Vec<CallEventRow>,
212 cpu_events: Vec<CpuEventRow>,
213 memory_events: Vec<MemoryEventRow>,
214 app_events: Vec<AppEventRow>,
215 setting_events: Vec<SettingEventRow>,
216 edit_events: Vec<EditEventRow>,
217 action_events: Vec<ActionEventRow>,
218}
219
220impl ToUpload {
221 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
222 Self::upload_to_table("editor_events", &self.editor_events, clickhouse_client)
223 .await
224 .with_context(|| format!("failed to upload to table 'editor_events'"))?;
225 Self::upload_to_table("copilot_events", &self.copilot_events, clickhouse_client)
226 .await
227 .with_context(|| format!("failed to upload to table 'copilot_events'"))?;
228 Self::upload_to_table(
229 "assistant_events",
230 &self.assistant_events,
231 clickhouse_client,
232 )
233 .await
234 .with_context(|| format!("failed to upload to table 'assistant_events'"))?;
235 Self::upload_to_table("call_events", &self.call_events, clickhouse_client)
236 .await
237 .with_context(|| format!("failed to upload to table 'call_events'"))?;
238 Self::upload_to_table("cpu_events", &self.cpu_events, clickhouse_client)
239 .await
240 .with_context(|| format!("failed to upload to table 'cpu_events'"))?;
241 Self::upload_to_table("memory_events", &self.memory_events, clickhouse_client)
242 .await
243 .with_context(|| format!("failed to upload to table 'memory_events'"))?;
244 Self::upload_to_table("app_events", &self.app_events, clickhouse_client)
245 .await
246 .with_context(|| format!("failed to upload to table 'app_events'"))?;
247 Self::upload_to_table("setting_events", &self.setting_events, clickhouse_client)
248 .await
249 .with_context(|| format!("failed to upload to table 'setting_events'"))?;
250 Self::upload_to_table("edit_events", &self.edit_events, clickhouse_client)
251 .await
252 .with_context(|| format!("failed to upload to table 'edit_events'"))?;
253 Self::upload_to_table("action_events", &self.action_events, clickhouse_client)
254 .await
255 .with_context(|| format!("failed to upload to table 'action_events'"))?;
256 Ok(())
257 }
258
259 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
260 table: &str,
261 rows: &[T],
262 clickhouse_client: &clickhouse::Client,
263 ) -> anyhow::Result<()> {
264 if !rows.is_empty() {
265 let mut insert = clickhouse_client.insert(table)?;
266
267 for event in rows {
268 insert.write(event).await?;
269 }
270
271 insert.end().await?;
272 }
273
274 Ok(())
275 }
276}
277
278pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
279where
280 S: Serializer,
281{
282 if country_code.len() != 2 {
283 use serde::ser::Error;
284 return Err(S::Error::custom(
285 "country_code must be exactly 2 characters",
286 ));
287 }
288
289 let country_code = country_code.as_bytes();
290
291 serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
292}
293
294#[derive(Serialize, Debug, clickhouse::Row)]
295pub struct EditorEventRow {
296 pub installation_id: String,
297 pub operation: String,
298 pub app_version: String,
299 pub file_extension: String,
300 pub os_name: String,
301 pub os_version: String,
302 pub release_channel: String,
303 pub signed_in: bool,
304 pub vim_mode: bool,
305 #[serde(serialize_with = "serialize_country_code")]
306 pub country_code: String,
307 pub region_code: String,
308 pub city: String,
309 pub time: i64,
310 pub copilot_enabled: bool,
311 pub copilot_enabled_for_language: bool,
312 pub historical_event: bool,
313 pub architecture: String,
314 pub is_staff: Option<bool>,
315 pub session_id: Option<String>,
316 pub major: Option<i32>,
317 pub minor: Option<i32>,
318 pub patch: Option<i32>,
319}
320
321impl EditorEventRow {
322 fn from_event(
323 event: EditorEvent,
324 wrapper: &EventWrapper,
325 body: &EventRequestBody,
326 first_event_at: chrono::DateTime<chrono::Utc>,
327 country_code: Option<String>,
328 ) -> Self {
329 let semver = body.semver();
330 let time =
331 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
332
333 Self {
334 app_version: body.app_version.clone(),
335 major: semver.map(|s| s.major as i32),
336 minor: semver.map(|s| s.minor as i32),
337 patch: semver.map(|s| s.patch as i32),
338 release_channel: body.release_channel.clone().unwrap_or_default(),
339 os_name: body.os_name.clone(),
340 os_version: body.os_version.clone().unwrap_or_default(),
341 architecture: body.architecture.clone(),
342 installation_id: body.installation_id.clone().unwrap_or_default(),
343 session_id: body.session_id.clone(),
344 is_staff: body.is_staff,
345 time: time.timestamp_millis(),
346 operation: event.operation,
347 file_extension: event.file_extension.unwrap_or_default(),
348 signed_in: wrapper.signed_in,
349 vim_mode: event.vim_mode,
350 copilot_enabled: event.copilot_enabled,
351 copilot_enabled_for_language: event.copilot_enabled_for_language,
352 country_code: country_code.unwrap_or("XX".to_string()),
353 region_code: "".to_string(),
354 city: "".to_string(),
355 historical_event: false,
356 }
357 }
358}
359
360#[derive(Serialize, Debug, clickhouse::Row)]
361pub struct CopilotEventRow {
362 pub installation_id: String,
363 pub suggestion_id: String,
364 pub suggestion_accepted: bool,
365 pub app_version: String,
366 pub file_extension: String,
367 pub os_name: String,
368 pub os_version: String,
369 pub release_channel: String,
370 pub signed_in: bool,
371 #[serde(serialize_with = "serialize_country_code")]
372 pub country_code: String,
373 pub region_code: String,
374 pub city: String,
375 pub time: i64,
376 pub is_staff: Option<bool>,
377 pub session_id: Option<String>,
378 pub major: Option<i32>,
379 pub minor: Option<i32>,
380 pub patch: Option<i32>,
381}
382
383impl CopilotEventRow {
384 fn from_event(
385 event: CopilotEvent,
386 wrapper: &EventWrapper,
387 body: &EventRequestBody,
388 first_event_at: chrono::DateTime<chrono::Utc>,
389 country_code: Option<String>,
390 ) -> Self {
391 let semver = body.semver();
392 let time =
393 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
394
395 Self {
396 app_version: body.app_version.clone(),
397 major: semver.map(|s| s.major as i32),
398 minor: semver.map(|s| s.minor as i32),
399 patch: semver.map(|s| s.patch as i32),
400 release_channel: body.release_channel.clone().unwrap_or_default(),
401 os_name: body.os_name.clone(),
402 os_version: body.os_version.clone().unwrap_or_default(),
403 installation_id: body.installation_id.clone().unwrap_or_default(),
404 session_id: body.session_id.clone(),
405 is_staff: body.is_staff,
406 time: time.timestamp_millis(),
407 file_extension: event.file_extension.unwrap_or_default(),
408 signed_in: wrapper.signed_in,
409 country_code: country_code.unwrap_or("XX".to_string()),
410 region_code: "".to_string(),
411 city: "".to_string(),
412 suggestion_id: event.suggestion_id.unwrap_or_default(),
413 suggestion_accepted: event.suggestion_accepted,
414 }
415 }
416}
417
418#[derive(Serialize, Debug, clickhouse::Row)]
419pub struct CallEventRow {
420 // AppInfoBase
421 app_version: String,
422 major: Option<i32>,
423 minor: Option<i32>,
424 patch: Option<i32>,
425 release_channel: String,
426
427 // ClientEventBase
428 installation_id: String,
429 session_id: Option<String>,
430 is_staff: Option<bool>,
431 time: i64,
432
433 // CallEventRow
434 operation: String,
435 room_id: Option<u64>,
436 channel_id: Option<u64>,
437}
438
439impl CallEventRow {
440 fn from_event(
441 event: CallEvent,
442 wrapper: &EventWrapper,
443 body: &EventRequestBody,
444 first_event_at: chrono::DateTime<chrono::Utc>,
445 ) -> Self {
446 let semver = body.semver();
447 let time =
448 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
449
450 Self {
451 app_version: body.app_version.clone(),
452 major: semver.map(|s| s.major as i32),
453 minor: semver.map(|s| s.minor as i32),
454 patch: semver.map(|s| s.patch as i32),
455 release_channel: body.release_channel.clone().unwrap_or_default(),
456 installation_id: body.installation_id.clone().unwrap_or_default(),
457 session_id: body.session_id.clone(),
458 is_staff: body.is_staff,
459 time: time.timestamp_millis(),
460 operation: event.operation,
461 room_id: event.room_id,
462 channel_id: event.channel_id,
463 }
464 }
465}
466
467#[derive(Serialize, Debug, clickhouse::Row)]
468pub struct AssistantEventRow {
469 // AppInfoBase
470 app_version: String,
471 major: Option<i32>,
472 minor: Option<i32>,
473 patch: Option<i32>,
474 release_channel: String,
475
476 // ClientEventBase
477 installation_id: Option<String>,
478 session_id: Option<String>,
479 is_staff: Option<bool>,
480 time: i64,
481
482 // AssistantEventRow
483 conversation_id: String,
484 kind: String,
485 model: String,
486}
487
488impl AssistantEventRow {
489 fn from_event(
490 event: AssistantEvent,
491 wrapper: &EventWrapper,
492 body: &EventRequestBody,
493 first_event_at: chrono::DateTime<chrono::Utc>,
494 ) -> Self {
495 let semver = body.semver();
496 let time =
497 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
498
499 Self {
500 app_version: body.app_version.clone(),
501 major: semver.map(|s| s.major as i32),
502 minor: semver.map(|s| s.minor as i32),
503 patch: semver.map(|s| s.patch as i32),
504 release_channel: body.release_channel.clone().unwrap_or_default(),
505 installation_id: body.installation_id.clone(),
506 session_id: body.session_id.clone(),
507 is_staff: body.is_staff,
508 time: time.timestamp_millis(),
509 conversation_id: event.conversation_id.unwrap_or_default(),
510 kind: event.kind.to_string(),
511 model: event.model,
512 }
513 }
514}
515
516#[derive(Debug, clickhouse::Row, Serialize)]
517pub struct CpuEventRow {
518 pub installation_id: Option<String>,
519 pub is_staff: Option<bool>,
520 pub usage_as_percentage: f32,
521 pub core_count: u32,
522 pub app_version: String,
523 pub release_channel: String,
524 pub time: i64,
525 pub session_id: Option<String>,
526 // pub normalized_cpu_usage: f64, MATERIALIZED
527 pub major: Option<i32>,
528 pub minor: Option<i32>,
529 pub patch: Option<i32>,
530}
531
532impl CpuEventRow {
533 fn from_event(
534 event: CpuEvent,
535 wrapper: &EventWrapper,
536 body: &EventRequestBody,
537 first_event_at: chrono::DateTime<chrono::Utc>,
538 ) -> Self {
539 let semver = body.semver();
540 let time =
541 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
542
543 Self {
544 app_version: body.app_version.clone(),
545 major: semver.map(|s| s.major as i32),
546 minor: semver.map(|s| s.minor as i32),
547 patch: semver.map(|s| s.patch as i32),
548 release_channel: body.release_channel.clone().unwrap_or_default(),
549 installation_id: body.installation_id.clone(),
550 session_id: body.session_id.clone(),
551 is_staff: body.is_staff,
552 time: time.timestamp_millis(),
553 usage_as_percentage: event.usage_as_percentage,
554 core_count: event.core_count,
555 }
556 }
557}
558
559#[derive(Serialize, Debug, clickhouse::Row)]
560pub struct MemoryEventRow {
561 // AppInfoBase
562 app_version: String,
563 major: Option<i32>,
564 minor: Option<i32>,
565 patch: Option<i32>,
566 release_channel: String,
567
568 // ClientEventBase
569 installation_id: Option<String>,
570 session_id: Option<String>,
571 is_staff: Option<bool>,
572 time: i64,
573
574 // MemoryEventRow
575 memory_in_bytes: u64,
576 virtual_memory_in_bytes: u64,
577}
578
579impl MemoryEventRow {
580 fn from_event(
581 event: MemoryEvent,
582 wrapper: &EventWrapper,
583 body: &EventRequestBody,
584 first_event_at: chrono::DateTime<chrono::Utc>,
585 ) -> Self {
586 let semver = body.semver();
587 let time =
588 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
589
590 Self {
591 app_version: body.app_version.clone(),
592 major: semver.map(|s| s.major as i32),
593 minor: semver.map(|s| s.minor as i32),
594 patch: semver.map(|s| s.patch as i32),
595 release_channel: body.release_channel.clone().unwrap_or_default(),
596 installation_id: body.installation_id.clone(),
597 session_id: body.session_id.clone(),
598 is_staff: body.is_staff,
599 time: time.timestamp_millis(),
600 memory_in_bytes: event.memory_in_bytes,
601 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
602 }
603 }
604}
605
606#[derive(Serialize, Debug, clickhouse::Row)]
607pub struct AppEventRow {
608 // AppInfoBase
609 app_version: String,
610 major: Option<i32>,
611 minor: Option<i32>,
612 patch: Option<i32>,
613 release_channel: String,
614
615 // ClientEventBase
616 installation_id: Option<String>,
617 session_id: Option<String>,
618 is_staff: Option<bool>,
619 time: i64,
620
621 // AppEventRow
622 operation: String,
623}
624
625impl AppEventRow {
626 fn from_event(
627 event: AppEvent,
628 wrapper: &EventWrapper,
629 body: &EventRequestBody,
630 first_event_at: chrono::DateTime<chrono::Utc>,
631 ) -> Self {
632 let semver = body.semver();
633 let time =
634 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
635
636 Self {
637 app_version: body.app_version.clone(),
638 major: semver.map(|s| s.major as i32),
639 minor: semver.map(|s| s.minor as i32),
640 patch: semver.map(|s| s.patch as i32),
641 release_channel: body.release_channel.clone().unwrap_or_default(),
642 installation_id: body.installation_id.clone(),
643 session_id: body.session_id.clone(),
644 is_staff: body.is_staff,
645 time: time.timestamp_millis(),
646 operation: event.operation,
647 }
648 }
649}
650
651#[derive(Serialize, Debug, clickhouse::Row)]
652pub struct SettingEventRow {
653 // AppInfoBase
654 app_version: String,
655 major: Option<i32>,
656 minor: Option<i32>,
657 patch: Option<i32>,
658 release_channel: String,
659
660 // ClientEventBase
661 installation_id: Option<String>,
662 session_id: Option<String>,
663 is_staff: Option<bool>,
664 time: i64,
665 // SettingEventRow
666 setting: String,
667 value: String,
668}
669
670impl SettingEventRow {
671 fn from_event(
672 event: SettingEvent,
673 wrapper: &EventWrapper,
674 body: &EventRequestBody,
675 first_event_at: chrono::DateTime<chrono::Utc>,
676 ) -> Self {
677 let semver = body.semver();
678 let time =
679 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
680
681 Self {
682 app_version: body.app_version.clone(),
683 major: semver.map(|s| s.major as i32),
684 minor: semver.map(|s| s.minor as i32),
685 patch: semver.map(|s| s.patch as i32),
686 release_channel: body.release_channel.clone().unwrap_or_default(),
687 installation_id: body.installation_id.clone(),
688 session_id: body.session_id.clone(),
689 is_staff: body.is_staff,
690 time: time.timestamp_millis(),
691 setting: event.setting,
692 value: event.value,
693 }
694 }
695}
696
697#[derive(Serialize, Debug, clickhouse::Row)]
698pub struct EditEventRow {
699 // AppInfoBase
700 app_version: String,
701 major: Option<i32>,
702 minor: Option<i32>,
703 patch: Option<i32>,
704 release_channel: String,
705
706 // ClientEventBase
707 installation_id: Option<String>,
708 // Note: This column name has a typo in the ClickHouse table.
709 #[serde(rename = "sesssion_id")]
710 session_id: Option<String>,
711 is_staff: Option<bool>,
712 time: i64,
713
714 // EditEventRow
715 period_start: i64,
716 period_end: i64,
717 environment: String,
718}
719
720impl EditEventRow {
721 fn from_event(
722 event: EditEvent,
723 wrapper: &EventWrapper,
724 body: &EventRequestBody,
725 first_event_at: chrono::DateTime<chrono::Utc>,
726 ) -> Self {
727 let semver = body.semver();
728 let time =
729 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
730
731 let period_start = time - chrono::Duration::milliseconds(event.duration);
732 let period_end = time;
733
734 Self {
735 app_version: body.app_version.clone(),
736 major: semver.map(|s| s.major as i32),
737 minor: semver.map(|s| s.minor as i32),
738 patch: semver.map(|s| s.patch as i32),
739 release_channel: body.release_channel.clone().unwrap_or_default(),
740 installation_id: body.installation_id.clone(),
741 session_id: body.session_id.clone(),
742 is_staff: body.is_staff,
743 time: time.timestamp_millis(),
744 period_start: period_start.timestamp_millis(),
745 period_end: period_end.timestamp_millis(),
746 environment: event.environment,
747 }
748 }
749}
750
751#[derive(Serialize, Debug, clickhouse::Row)]
752pub struct ActionEventRow {
753 // AppInfoBase
754 app_version: String,
755 major: Option<i32>,
756 minor: Option<i32>,
757 patch: Option<i32>,
758 release_channel: String,
759
760 // ClientEventBase
761 installation_id: Option<String>,
762 // Note: This column name has a typo in the ClickHouse table.
763 #[serde(rename = "sesssion_id")]
764 session_id: Option<String>,
765 is_staff: Option<bool>,
766 time: i64,
767 // ActionEventRow
768 source: String,
769 action: String,
770}
771
772impl ActionEventRow {
773 fn from_event(
774 event: ActionEvent,
775 wrapper: &EventWrapper,
776 body: &EventRequestBody,
777 first_event_at: chrono::DateTime<chrono::Utc>,
778 ) -> Self {
779 let semver = body.semver();
780 let time =
781 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
782
783 Self {
784 app_version: body.app_version.clone(),
785 major: semver.map(|s| s.major as i32),
786 minor: semver.map(|s| s.minor as i32),
787 patch: semver.map(|s| s.patch as i32),
788 release_channel: body.release_channel.clone().unwrap_or_default(),
789 installation_id: body.installation_id.clone(),
790 session_id: body.session_id.clone(),
791 is_staff: body.is_staff,
792 time: time.timestamp_millis(),
793 source: event.source,
794 action: event.action,
795 }
796 }
797}