1use std::sync::{Arc, OnceLock};
2
3use anyhow::{anyhow, Context};
4use aws_sdk_s3::primitives::ByteStream;
5use axum::{
6 body::Bytes, headers::Header, http::HeaderName, routing::post, Extension, Router, TypedHeader,
7};
8use hyper::{HeaderMap, StatusCode};
9use serde::{Serialize, Serializer};
10use sha2::{Digest, Sha256};
11use telemetry_events::{
12 ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
13 EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
14};
15use util::SemanticVersion;
16
17use crate::{api::slack, AppState, Error, Result};
18
19use super::ips_file::IpsFile;
20
21pub fn router() -> Router {
22 Router::new()
23 .route("/telemetry/events", post(post_events))
24 .route("/telemetry/crashes", post(post_crash))
25}
26
27pub struct ZedChecksumHeader(Vec<u8>);
28
29impl Header for ZedChecksumHeader {
30 fn name() -> &'static HeaderName {
31 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
32 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
33 }
34
35 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
36 where
37 Self: Sized,
38 I: Iterator<Item = &'i axum::http::HeaderValue>,
39 {
40 let checksum = values
41 .next()
42 .ok_or_else(axum::headers::Error::invalid)?
43 .to_str()
44 .map_err(|_| axum::headers::Error::invalid())?;
45
46 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
47 Ok(Self(bytes))
48 }
49
50 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
51 unimplemented!()
52 }
53}
54
55pub struct CloudflareIpCountryHeader(String);
56
57impl Header for CloudflareIpCountryHeader {
58 fn name() -> &'static HeaderName {
59 static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
60 CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
61 }
62
63 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
64 where
65 Self: Sized,
66 I: Iterator<Item = &'i axum::http::HeaderValue>,
67 {
68 let country_code = values
69 .next()
70 .ok_or_else(axum::headers::Error::invalid)?
71 .to_str()
72 .map_err(|_| axum::headers::Error::invalid())?;
73
74 Ok(Self(country_code.to_string()))
75 }
76
77 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
78 unimplemented!()
79 }
80}
81
82pub async fn post_crash(
83 Extension(app): Extension<Arc<AppState>>,
84 body: Bytes,
85 headers: HeaderMap,
86) -> Result<()> {
87 static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
88
89 let report = IpsFile::parse(&body)?;
90 let version_threshold = SemanticVersion::new(0, 123, 0);
91
92 let bundle_id = &report.header.bundle_id;
93 let app_version = &report.app_version();
94
95 if bundle_id == "dev.zed.Zed-Dev" {
96 log::error!("Crash uploads from {} are ignored.", bundle_id);
97 return Ok(());
98 }
99
100 if app_version.is_none() || app_version.unwrap() < version_threshold {
101 log::error!(
102 "Crash uploads from {} are ignored.",
103 report.header.app_version
104 );
105 return Ok(());
106 }
107 let app_version = app_version.unwrap();
108
109 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
110 let response = blob_store_client
111 .head_object()
112 .bucket(CRASH_REPORTS_BUCKET)
113 .key(report.header.incident_id.clone() + ".ips")
114 .send()
115 .await;
116
117 if response.is_ok() {
118 log::info!("We've already uploaded this crash");
119 return Ok(());
120 }
121
122 blob_store_client
123 .put_object()
124 .bucket(CRASH_REPORTS_BUCKET)
125 .key(report.header.incident_id.clone() + ".ips")
126 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
127 .body(ByteStream::from(body.to_vec()))
128 .send()
129 .await
130 .map_err(|e| log::error!("Failed to upload crash: {}", e))
131 .ok();
132 }
133
134 let recent_panic_on: Option<i64> = headers
135 .get("x-zed-panicked-on")
136 .and_then(|h| h.to_str().ok())
137 .and_then(|s| s.parse().ok());
138 let mut recent_panic = None;
139
140 if let Some(recent_panic_on) = recent_panic_on {
141 let crashed_at = match report.timestamp() {
142 Ok(t) => Some(t),
143 Err(e) => {
144 log::error!("Can't parse {}: {}", report.header.timestamp, e);
145 None
146 }
147 };
148 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
149 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
150 }
151 }
152
153 let description = report.description(recent_panic);
154 let summary = report.backtrace_summary();
155
156 tracing::error!(
157 service = "client",
158 version = %report.header.app_version,
159 os_version = %report.header.os_version,
160 bundle_id = %report.header.bundle_id,
161 incident_id = %report.header.incident_id,
162 description = %description,
163 backtrace = %summary,
164 "crash report");
165
166 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
167 let payload = slack::WebhookBody::new(|w| {
168 w.add_section(|s| s.text(slack::Text::markdown(description)))
169 .add_section(|s| {
170 s.add_field(slack::Text::markdown(format!(
171 "*Version:*\n{} ({})",
172 bundle_id, app_version
173 )))
174 .add_field({
175 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
176 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
177 hostname.strip_prefix("http://").unwrap_or_default()
178 });
179
180 slack::Text::markdown(format!(
181 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
182 CRASH_REPORTS_BUCKET,
183 hostname,
184 report.header.incident_id,
185 report
186 .header
187 .incident_id
188 .chars()
189 .take(8)
190 .collect::<String>(),
191 ))
192 })
193 })
194 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
195 });
196 let payload_json = serde_json::to_string(&payload).map_err(|err| {
197 log::error!("Failed to serialize payload to JSON: {err}");
198 Error::Internal(anyhow!(err))
199 })?;
200
201 reqwest::Client::new()
202 .post(slack_panics_webhook)
203 .header("Content-Type", "application/json")
204 .body(payload_json)
205 .send()
206 .await
207 .map_err(|err| {
208 log::error!("Failed to send payload to Slack: {err}");
209 Error::Internal(anyhow!(err))
210 })?;
211 }
212
213 Ok(())
214}
215
216pub async fn post_events(
217 Extension(app): Extension<Arc<AppState>>,
218 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
219 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
220 body: Bytes,
221) -> Result<()> {
222 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
223 Err(Error::Http(
224 StatusCode::NOT_IMPLEMENTED,
225 "not supported".into(),
226 ))?
227 };
228
229 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
230 return Err(Error::Http(
231 StatusCode::INTERNAL_SERVER_ERROR,
232 "events not enabled".into(),
233 ))?;
234 };
235
236 let mut summer = Sha256::new();
237 summer.update(checksum_seed);
238 summer.update(&body);
239 summer.update(checksum_seed);
240
241 if &checksum != &summer.finalize()[..] {
242 return Err(Error::Http(
243 StatusCode::BAD_REQUEST,
244 "invalid checksum".into(),
245 ))?;
246 }
247
248 let request_body: telemetry_events::EventRequestBody =
249 serde_json::from_slice(&body).map_err(|err| {
250 log::error!("can't parse event json: {err}");
251 Error::Internal(anyhow!(err))
252 })?;
253
254 let mut to_upload = ToUpload::default();
255 let Some(last_event) = request_body.events.last() else {
256 return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
257 };
258 let country_code = country_code_header.map(|h| h.0 .0);
259
260 let first_event_at = chrono::Utc::now()
261 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
262
263 for wrapper in &request_body.events {
264 match &wrapper.event {
265 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
266 event.clone(),
267 &wrapper,
268 &request_body,
269 first_event_at,
270 country_code.clone(),
271 )),
272 Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
273 event.clone(),
274 &wrapper,
275 &request_body,
276 first_event_at,
277 country_code.clone(),
278 )),
279 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
280 event.clone(),
281 &wrapper,
282 &request_body,
283 first_event_at,
284 )),
285 Event::Assistant(event) => {
286 to_upload
287 .assistant_events
288 .push(AssistantEventRow::from_event(
289 event.clone(),
290 &wrapper,
291 &request_body,
292 first_event_at,
293 ))
294 }
295 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
296 event.clone(),
297 &wrapper,
298 &request_body,
299 first_event_at,
300 )),
301 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
302 event.clone(),
303 &wrapper,
304 &request_body,
305 first_event_at,
306 )),
307 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
308 event.clone(),
309 &wrapper,
310 &request_body,
311 first_event_at,
312 )),
313 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
314 event.clone(),
315 &wrapper,
316 &request_body,
317 first_event_at,
318 )),
319 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
320 event.clone(),
321 &wrapper,
322 &request_body,
323 first_event_at,
324 )),
325 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
326 event.clone(),
327 &wrapper,
328 &request_body,
329 first_event_at,
330 )),
331 }
332 }
333
334 to_upload
335 .upload(&clickhouse_client)
336 .await
337 .map_err(|err| Error::Internal(anyhow!(err)))?;
338
339 Ok(())
340}
341
342#[derive(Default)]
343struct ToUpload {
344 editor_events: Vec<EditorEventRow>,
345 copilot_events: Vec<CopilotEventRow>,
346 assistant_events: Vec<AssistantEventRow>,
347 call_events: Vec<CallEventRow>,
348 cpu_events: Vec<CpuEventRow>,
349 memory_events: Vec<MemoryEventRow>,
350 app_events: Vec<AppEventRow>,
351 setting_events: Vec<SettingEventRow>,
352 edit_events: Vec<EditEventRow>,
353 action_events: Vec<ActionEventRow>,
354}
355
356impl ToUpload {
357 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
358 const EDITOR_EVENTS_TABLE: &str = "editor_events";
359 Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
360 .await
361 .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
362
363 const COPILOT_EVENTS_TABLE: &str = "copilot_events";
364 Self::upload_to_table(
365 COPILOT_EVENTS_TABLE,
366 &self.copilot_events,
367 clickhouse_client,
368 )
369 .await
370 .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
371
372 const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
373 Self::upload_to_table(
374 ASSISTANT_EVENTS_TABLE,
375 &self.assistant_events,
376 clickhouse_client,
377 )
378 .await
379 .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
380
381 const CALL_EVENTS_TABLE: &str = "call_events";
382 Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
383 .await
384 .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
385
386 const CPU_EVENTS_TABLE: &str = "cpu_events";
387 Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
388 .await
389 .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
390
391 const MEMORY_EVENTS_TABLE: &str = "memory_events";
392 Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
393 .await
394 .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
395
396 const APP_EVENTS_TABLE: &str = "app_events";
397 Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
398 .await
399 .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
400
401 const SETTING_EVENTS_TABLE: &str = "setting_events";
402 Self::upload_to_table(
403 SETTING_EVENTS_TABLE,
404 &self.setting_events,
405 clickhouse_client,
406 )
407 .await
408 .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
409
410 const EDIT_EVENTS_TABLE: &str = "edit_events";
411 Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
412 .await
413 .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
414
415 const ACTION_EVENTS_TABLE: &str = "action_events";
416 Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
417 .await
418 .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
419
420 Ok(())
421 }
422
423 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
424 table: &str,
425 rows: &[T],
426 clickhouse_client: &clickhouse::Client,
427 ) -> anyhow::Result<()> {
428 if !rows.is_empty() {
429 let mut insert = clickhouse_client.insert(table)?;
430
431 for event in rows {
432 insert.write(event).await?;
433 }
434
435 insert.end().await?;
436 }
437
438 Ok(())
439 }
440}
441
442pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
443where
444 S: Serializer,
445{
446 if country_code.len() != 2 {
447 use serde::ser::Error;
448 return Err(S::Error::custom(
449 "country_code must be exactly 2 characters",
450 ));
451 }
452
453 let country_code = country_code.as_bytes();
454
455 serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
456}
457
458#[derive(Serialize, Debug, clickhouse::Row)]
459pub struct EditorEventRow {
460 pub installation_id: String,
461 pub operation: String,
462 pub app_version: String,
463 pub file_extension: String,
464 pub os_name: String,
465 pub os_version: String,
466 pub release_channel: String,
467 pub signed_in: bool,
468 pub vim_mode: bool,
469 #[serde(serialize_with = "serialize_country_code")]
470 pub country_code: String,
471 pub region_code: String,
472 pub city: String,
473 pub time: i64,
474 pub copilot_enabled: bool,
475 pub copilot_enabled_for_language: bool,
476 pub historical_event: bool,
477 pub architecture: String,
478 pub is_staff: Option<bool>,
479 pub session_id: Option<String>,
480 pub major: Option<i32>,
481 pub minor: Option<i32>,
482 pub patch: Option<i32>,
483}
484
485impl EditorEventRow {
486 fn from_event(
487 event: EditorEvent,
488 wrapper: &EventWrapper,
489 body: &EventRequestBody,
490 first_event_at: chrono::DateTime<chrono::Utc>,
491 country_code: Option<String>,
492 ) -> Self {
493 let semver = body.semver();
494 let time =
495 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
496
497 Self {
498 app_version: body.app_version.clone(),
499 major: semver.map(|s| s.major as i32),
500 minor: semver.map(|s| s.minor as i32),
501 patch: semver.map(|s| s.patch as i32),
502 release_channel: body.release_channel.clone().unwrap_or_default(),
503 os_name: body.os_name.clone(),
504 os_version: body.os_version.clone().unwrap_or_default(),
505 architecture: body.architecture.clone(),
506 installation_id: body.installation_id.clone().unwrap_or_default(),
507 session_id: body.session_id.clone(),
508 is_staff: body.is_staff,
509 time: time.timestamp_millis(),
510 operation: event.operation,
511 file_extension: event.file_extension.unwrap_or_default(),
512 signed_in: wrapper.signed_in,
513 vim_mode: event.vim_mode,
514 copilot_enabled: event.copilot_enabled,
515 copilot_enabled_for_language: event.copilot_enabled_for_language,
516 country_code: country_code.unwrap_or("XX".to_string()),
517 region_code: "".to_string(),
518 city: "".to_string(),
519 historical_event: false,
520 }
521 }
522}
523
524#[derive(Serialize, Debug, clickhouse::Row)]
525pub struct CopilotEventRow {
526 pub installation_id: String,
527 pub suggestion_id: String,
528 pub suggestion_accepted: bool,
529 pub app_version: String,
530 pub file_extension: String,
531 pub os_name: String,
532 pub os_version: String,
533 pub release_channel: String,
534 pub signed_in: bool,
535 #[serde(serialize_with = "serialize_country_code")]
536 pub country_code: String,
537 pub region_code: String,
538 pub city: String,
539 pub time: i64,
540 pub is_staff: Option<bool>,
541 pub session_id: Option<String>,
542 pub major: Option<i32>,
543 pub minor: Option<i32>,
544 pub patch: Option<i32>,
545}
546
547impl CopilotEventRow {
548 fn from_event(
549 event: CopilotEvent,
550 wrapper: &EventWrapper,
551 body: &EventRequestBody,
552 first_event_at: chrono::DateTime<chrono::Utc>,
553 country_code: Option<String>,
554 ) -> Self {
555 let semver = body.semver();
556 let time =
557 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
558
559 Self {
560 app_version: body.app_version.clone(),
561 major: semver.map(|s| s.major as i32),
562 minor: semver.map(|s| s.minor as i32),
563 patch: semver.map(|s| s.patch as i32),
564 release_channel: body.release_channel.clone().unwrap_or_default(),
565 os_name: body.os_name.clone(),
566 os_version: body.os_version.clone().unwrap_or_default(),
567 installation_id: body.installation_id.clone().unwrap_or_default(),
568 session_id: body.session_id.clone(),
569 is_staff: body.is_staff,
570 time: time.timestamp_millis(),
571 file_extension: event.file_extension.unwrap_or_default(),
572 signed_in: wrapper.signed_in,
573 country_code: country_code.unwrap_or("XX".to_string()),
574 region_code: "".to_string(),
575 city: "".to_string(),
576 suggestion_id: event.suggestion_id.unwrap_or_default(),
577 suggestion_accepted: event.suggestion_accepted,
578 }
579 }
580}
581
582#[derive(Serialize, Debug, clickhouse::Row)]
583pub struct CallEventRow {
584 // AppInfoBase
585 app_version: String,
586 major: Option<i32>,
587 minor: Option<i32>,
588 patch: Option<i32>,
589 release_channel: String,
590
591 // ClientEventBase
592 installation_id: String,
593 session_id: Option<String>,
594 is_staff: Option<bool>,
595 time: i64,
596
597 // CallEventRow
598 operation: String,
599 room_id: Option<u64>,
600 channel_id: Option<u64>,
601}
602
603impl CallEventRow {
604 fn from_event(
605 event: CallEvent,
606 wrapper: &EventWrapper,
607 body: &EventRequestBody,
608 first_event_at: chrono::DateTime<chrono::Utc>,
609 ) -> Self {
610 let semver = body.semver();
611 let time =
612 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
613
614 Self {
615 app_version: body.app_version.clone(),
616 major: semver.map(|s| s.major as i32),
617 minor: semver.map(|s| s.minor as i32),
618 patch: semver.map(|s| s.patch as i32),
619 release_channel: body.release_channel.clone().unwrap_or_default(),
620 installation_id: body.installation_id.clone().unwrap_or_default(),
621 session_id: body.session_id.clone(),
622 is_staff: body.is_staff,
623 time: time.timestamp_millis(),
624 operation: event.operation,
625 room_id: event.room_id,
626 channel_id: event.channel_id,
627 }
628 }
629}
630
631#[derive(Serialize, Debug, clickhouse::Row)]
632pub struct AssistantEventRow {
633 // AppInfoBase
634 app_version: String,
635 major: Option<i32>,
636 minor: Option<i32>,
637 patch: Option<i32>,
638 release_channel: String,
639
640 // ClientEventBase
641 installation_id: Option<String>,
642 session_id: Option<String>,
643 is_staff: Option<bool>,
644 time: i64,
645
646 // AssistantEventRow
647 conversation_id: String,
648 kind: String,
649 model: String,
650}
651
652impl AssistantEventRow {
653 fn from_event(
654 event: AssistantEvent,
655 wrapper: &EventWrapper,
656 body: &EventRequestBody,
657 first_event_at: chrono::DateTime<chrono::Utc>,
658 ) -> Self {
659 let semver = body.semver();
660 let time =
661 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
662
663 Self {
664 app_version: body.app_version.clone(),
665 major: semver.map(|s| s.major as i32),
666 minor: semver.map(|s| s.minor as i32),
667 patch: semver.map(|s| s.patch as i32),
668 release_channel: body.release_channel.clone().unwrap_or_default(),
669 installation_id: body.installation_id.clone(),
670 session_id: body.session_id.clone(),
671 is_staff: body.is_staff,
672 time: time.timestamp_millis(),
673 conversation_id: event.conversation_id.unwrap_or_default(),
674 kind: event.kind.to_string(),
675 model: event.model,
676 }
677 }
678}
679
680#[derive(Debug, clickhouse::Row, Serialize)]
681pub struct CpuEventRow {
682 pub installation_id: Option<String>,
683 pub is_staff: Option<bool>,
684 pub usage_as_percentage: f32,
685 pub core_count: u32,
686 pub app_version: String,
687 pub release_channel: String,
688 pub time: i64,
689 pub session_id: Option<String>,
690 // pub normalized_cpu_usage: f64, MATERIALIZED
691 pub major: Option<i32>,
692 pub minor: Option<i32>,
693 pub patch: Option<i32>,
694}
695
696impl CpuEventRow {
697 fn from_event(
698 event: CpuEvent,
699 wrapper: &EventWrapper,
700 body: &EventRequestBody,
701 first_event_at: chrono::DateTime<chrono::Utc>,
702 ) -> Self {
703 let semver = body.semver();
704 let time =
705 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
706
707 Self {
708 app_version: body.app_version.clone(),
709 major: semver.map(|s| s.major as i32),
710 minor: semver.map(|s| s.minor as i32),
711 patch: semver.map(|s| s.patch as i32),
712 release_channel: body.release_channel.clone().unwrap_or_default(),
713 installation_id: body.installation_id.clone(),
714 session_id: body.session_id.clone(),
715 is_staff: body.is_staff,
716 time: time.timestamp_millis(),
717 usage_as_percentage: event.usage_as_percentage,
718 core_count: event.core_count,
719 }
720 }
721}
722
723#[derive(Serialize, Debug, clickhouse::Row)]
724pub struct MemoryEventRow {
725 // AppInfoBase
726 app_version: String,
727 major: Option<i32>,
728 minor: Option<i32>,
729 patch: Option<i32>,
730 release_channel: String,
731
732 // ClientEventBase
733 installation_id: Option<String>,
734 session_id: Option<String>,
735 is_staff: Option<bool>,
736 time: i64,
737
738 // MemoryEventRow
739 memory_in_bytes: u64,
740 virtual_memory_in_bytes: u64,
741}
742
743impl MemoryEventRow {
744 fn from_event(
745 event: MemoryEvent,
746 wrapper: &EventWrapper,
747 body: &EventRequestBody,
748 first_event_at: chrono::DateTime<chrono::Utc>,
749 ) -> Self {
750 let semver = body.semver();
751 let time =
752 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
753
754 Self {
755 app_version: body.app_version.clone(),
756 major: semver.map(|s| s.major as i32),
757 minor: semver.map(|s| s.minor as i32),
758 patch: semver.map(|s| s.patch as i32),
759 release_channel: body.release_channel.clone().unwrap_or_default(),
760 installation_id: body.installation_id.clone(),
761 session_id: body.session_id.clone(),
762 is_staff: body.is_staff,
763 time: time.timestamp_millis(),
764 memory_in_bytes: event.memory_in_bytes,
765 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
766 }
767 }
768}
769
770#[derive(Serialize, Debug, clickhouse::Row)]
771pub struct AppEventRow {
772 // AppInfoBase
773 app_version: String,
774 major: Option<i32>,
775 minor: Option<i32>,
776 patch: Option<i32>,
777 release_channel: String,
778
779 // ClientEventBase
780 installation_id: Option<String>,
781 session_id: Option<String>,
782 is_staff: Option<bool>,
783 time: i64,
784
785 // AppEventRow
786 operation: String,
787}
788
789impl AppEventRow {
790 fn from_event(
791 event: AppEvent,
792 wrapper: &EventWrapper,
793 body: &EventRequestBody,
794 first_event_at: chrono::DateTime<chrono::Utc>,
795 ) -> Self {
796 let semver = body.semver();
797 let time =
798 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
799
800 Self {
801 app_version: body.app_version.clone(),
802 major: semver.map(|s| s.major as i32),
803 minor: semver.map(|s| s.minor as i32),
804 patch: semver.map(|s| s.patch as i32),
805 release_channel: body.release_channel.clone().unwrap_or_default(),
806 installation_id: body.installation_id.clone(),
807 session_id: body.session_id.clone(),
808 is_staff: body.is_staff,
809 time: time.timestamp_millis(),
810 operation: event.operation,
811 }
812 }
813}
814
815#[derive(Serialize, Debug, clickhouse::Row)]
816pub struct SettingEventRow {
817 // AppInfoBase
818 app_version: String,
819 major: Option<i32>,
820 minor: Option<i32>,
821 patch: Option<i32>,
822 release_channel: String,
823
824 // ClientEventBase
825 installation_id: Option<String>,
826 session_id: Option<String>,
827 is_staff: Option<bool>,
828 time: i64,
829 // SettingEventRow
830 setting: String,
831 value: String,
832}
833
834impl SettingEventRow {
835 fn from_event(
836 event: SettingEvent,
837 wrapper: &EventWrapper,
838 body: &EventRequestBody,
839 first_event_at: chrono::DateTime<chrono::Utc>,
840 ) -> Self {
841 let semver = body.semver();
842 let time =
843 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
844
845 Self {
846 app_version: body.app_version.clone(),
847 major: semver.map(|s| s.major as i32),
848 minor: semver.map(|s| s.minor as i32),
849 patch: semver.map(|s| s.patch as i32),
850 release_channel: body.release_channel.clone().unwrap_or_default(),
851 installation_id: body.installation_id.clone(),
852 session_id: body.session_id.clone(),
853 is_staff: body.is_staff,
854 time: time.timestamp_millis(),
855 setting: event.setting,
856 value: event.value,
857 }
858 }
859}
860
861#[derive(Serialize, Debug, clickhouse::Row)]
862pub struct EditEventRow {
863 // AppInfoBase
864 app_version: String,
865 major: Option<i32>,
866 minor: Option<i32>,
867 patch: Option<i32>,
868 release_channel: String,
869
870 // ClientEventBase
871 installation_id: Option<String>,
872 // Note: This column name has a typo in the ClickHouse table.
873 #[serde(rename = "sesssion_id")]
874 session_id: Option<String>,
875 is_staff: Option<bool>,
876 time: i64,
877
878 // EditEventRow
879 period_start: i64,
880 period_end: i64,
881 environment: String,
882}
883
884impl EditEventRow {
885 fn from_event(
886 event: EditEvent,
887 wrapper: &EventWrapper,
888 body: &EventRequestBody,
889 first_event_at: chrono::DateTime<chrono::Utc>,
890 ) -> Self {
891 let semver = body.semver();
892 let time =
893 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
894
895 let period_start = time - chrono::Duration::milliseconds(event.duration);
896 let period_end = time;
897
898 Self {
899 app_version: body.app_version.clone(),
900 major: semver.map(|s| s.major as i32),
901 minor: semver.map(|s| s.minor as i32),
902 patch: semver.map(|s| s.patch as i32),
903 release_channel: body.release_channel.clone().unwrap_or_default(),
904 installation_id: body.installation_id.clone(),
905 session_id: body.session_id.clone(),
906 is_staff: body.is_staff,
907 time: time.timestamp_millis(),
908 period_start: period_start.timestamp_millis(),
909 period_end: period_end.timestamp_millis(),
910 environment: event.environment,
911 }
912 }
913}
914
915#[derive(Serialize, Debug, clickhouse::Row)]
916pub struct ActionEventRow {
917 // AppInfoBase
918 app_version: String,
919 major: Option<i32>,
920 minor: Option<i32>,
921 patch: Option<i32>,
922 release_channel: String,
923
924 // ClientEventBase
925 installation_id: Option<String>,
926 // Note: This column name has a typo in the ClickHouse table.
927 #[serde(rename = "sesssion_id")]
928 session_id: Option<String>,
929 is_staff: Option<bool>,
930 time: i64,
931 // ActionEventRow
932 source: String,
933 action: String,
934}
935
936impl ActionEventRow {
937 fn from_event(
938 event: ActionEvent,
939 wrapper: &EventWrapper,
940 body: &EventRequestBody,
941 first_event_at: chrono::DateTime<chrono::Utc>,
942 ) -> Self {
943 let semver = body.semver();
944 let time =
945 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
946
947 Self {
948 app_version: body.app_version.clone(),
949 major: semver.map(|s| s.major as i32),
950 minor: semver.map(|s| s.minor as i32),
951 patch: semver.map(|s| s.patch as i32),
952 release_channel: body.release_channel.clone().unwrap_or_default(),
953 installation_id: body.installation_id.clone(),
954 session_id: body.session_id.clone(),
955 is_staff: body.is_staff,
956 time: time.timestamp_millis(),
957 source: event.source,
958 action: event.action,
959 }
960 }
961}