1use std::sync::{Arc, OnceLock};
2
3use anyhow::{anyhow, Context};
4use aws_sdk_s3::primitives::ByteStream;
5use axum::{
6 body::Bytes,
7 headers::Header,
8 http::{HeaderMap, HeaderName, StatusCode},
9 routing::post,
10 Extension, Router, TypedHeader,
11};
12use serde::{Serialize, Serializer};
13use sha2::{Digest, Sha256};
14use telemetry_events::{
15 ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
16 EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
17};
18use util::SemanticVersion;
19
20use crate::{api::slack, AppState, Error, Result};
21
22use super::ips_file::IpsFile;
23
24pub fn router() -> Router {
25 Router::new()
26 .route("/telemetry/events", post(post_events))
27 .route("/telemetry/crashes", post(post_crash))
28}
29
30pub struct ZedChecksumHeader(Vec<u8>);
31
32impl Header for ZedChecksumHeader {
33 fn name() -> &'static HeaderName {
34 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
35 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
36 }
37
38 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
39 where
40 Self: Sized,
41 I: Iterator<Item = &'i axum::http::HeaderValue>,
42 {
43 let checksum = values
44 .next()
45 .ok_or_else(axum::headers::Error::invalid)?
46 .to_str()
47 .map_err(|_| axum::headers::Error::invalid())?;
48
49 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
50 Ok(Self(bytes))
51 }
52
53 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
54 unimplemented!()
55 }
56}
57
58pub struct CloudflareIpCountryHeader(String);
59
60impl Header for CloudflareIpCountryHeader {
61 fn name() -> &'static HeaderName {
62 static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
63 CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
64 }
65
66 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
67 where
68 Self: Sized,
69 I: Iterator<Item = &'i axum::http::HeaderValue>,
70 {
71 let country_code = values
72 .next()
73 .ok_or_else(axum::headers::Error::invalid)?
74 .to_str()
75 .map_err(|_| axum::headers::Error::invalid())?;
76
77 Ok(Self(country_code.to_string()))
78 }
79
80 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
81 unimplemented!()
82 }
83}
84
85pub async fn post_crash(
86 Extension(app): Extension<Arc<AppState>>,
87 headers: HeaderMap,
88 body: Bytes,
89) -> Result<()> {
90 static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
91
92 let report = IpsFile::parse(&body)?;
93 let version_threshold = SemanticVersion::new(0, 123, 0);
94
95 let bundle_id = &report.header.bundle_id;
96 let app_version = &report.app_version();
97
98 if bundle_id == "dev.zed.Zed-Dev" {
99 log::error!("Crash uploads from {} are ignored.", bundle_id);
100 return Ok(());
101 }
102
103 if app_version.is_none() || app_version.unwrap() < version_threshold {
104 log::error!(
105 "Crash uploads from {} are ignored.",
106 report.header.app_version
107 );
108 return Ok(());
109 }
110 let app_version = app_version.unwrap();
111
112 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
113 let response = blob_store_client
114 .head_object()
115 .bucket(CRASH_REPORTS_BUCKET)
116 .key(report.header.incident_id.clone() + ".ips")
117 .send()
118 .await;
119
120 if response.is_ok() {
121 log::info!("We've already uploaded this crash");
122 return Ok(());
123 }
124
125 blob_store_client
126 .put_object()
127 .bucket(CRASH_REPORTS_BUCKET)
128 .key(report.header.incident_id.clone() + ".ips")
129 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
130 .body(ByteStream::from(body.to_vec()))
131 .send()
132 .await
133 .map_err(|e| log::error!("Failed to upload crash: {}", e))
134 .ok();
135 }
136
137 let recent_panic_on: Option<i64> = headers
138 .get("x-zed-panicked-on")
139 .and_then(|h| h.to_str().ok())
140 .and_then(|s| s.parse().ok());
141 let mut recent_panic = None;
142
143 if let Some(recent_panic_on) = recent_panic_on {
144 let crashed_at = match report.timestamp() {
145 Ok(t) => Some(t),
146 Err(e) => {
147 log::error!("Can't parse {}: {}", report.header.timestamp, e);
148 None
149 }
150 };
151 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
152 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
153 }
154 }
155
156 let description = report.description(recent_panic);
157 let summary = report.backtrace_summary();
158
159 tracing::error!(
160 service = "client",
161 version = %report.header.app_version,
162 os_version = %report.header.os_version,
163 bundle_id = %report.header.bundle_id,
164 incident_id = %report.header.incident_id,
165 description = %description,
166 backtrace = %summary,
167 "crash report");
168
169 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
170 let payload = slack::WebhookBody::new(|w| {
171 w.add_section(|s| s.text(slack::Text::markdown(description)))
172 .add_section(|s| {
173 s.add_field(slack::Text::markdown(format!(
174 "*Version:*\n{} ({})",
175 bundle_id, app_version
176 )))
177 .add_field({
178 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
179 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
180 hostname.strip_prefix("http://").unwrap_or_default()
181 });
182
183 slack::Text::markdown(format!(
184 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
185 CRASH_REPORTS_BUCKET,
186 hostname,
187 report.header.incident_id,
188 report
189 .header
190 .incident_id
191 .chars()
192 .take(8)
193 .collect::<String>(),
194 ))
195 })
196 })
197 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
198 });
199 let payload_json = serde_json::to_string(&payload).map_err(|err| {
200 log::error!("Failed to serialize payload to JSON: {err}");
201 Error::Internal(anyhow!(err))
202 })?;
203
204 reqwest::Client::new()
205 .post(slack_panics_webhook)
206 .header("Content-Type", "application/json")
207 .body(payload_json)
208 .send()
209 .await
210 .map_err(|err| {
211 log::error!("Failed to send payload to Slack: {err}");
212 Error::Internal(anyhow!(err))
213 })?;
214 }
215
216 Ok(())
217}
218
219pub async fn post_events(
220 Extension(app): Extension<Arc<AppState>>,
221 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
222 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
223 body: Bytes,
224) -> Result<()> {
225 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
226 Err(Error::Http(
227 StatusCode::NOT_IMPLEMENTED,
228 "not supported".into(),
229 ))?
230 };
231
232 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
233 return Err(Error::Http(
234 StatusCode::INTERNAL_SERVER_ERROR,
235 "events not enabled".into(),
236 ))?;
237 };
238
239 let mut summer = Sha256::new();
240 summer.update(checksum_seed);
241 summer.update(&body);
242 summer.update(checksum_seed);
243
244 if &checksum != &summer.finalize()[..] {
245 return Err(Error::Http(
246 StatusCode::BAD_REQUEST,
247 "invalid checksum".into(),
248 ))?;
249 }
250
251 let request_body: telemetry_events::EventRequestBody =
252 serde_json::from_slice(&body).map_err(|err| {
253 log::error!("can't parse event json: {err}");
254 Error::Internal(anyhow!(err))
255 })?;
256
257 let mut to_upload = ToUpload::default();
258 let Some(last_event) = request_body.events.last() else {
259 return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
260 };
261 let country_code = country_code_header.map(|h| h.0 .0);
262
263 let first_event_at = chrono::Utc::now()
264 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
265
266 for wrapper in &request_body.events {
267 match &wrapper.event {
268 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
269 event.clone(),
270 &wrapper,
271 &request_body,
272 first_event_at,
273 country_code.clone(),
274 )),
275 Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
276 event.clone(),
277 &wrapper,
278 &request_body,
279 first_event_at,
280 country_code.clone(),
281 )),
282 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
283 event.clone(),
284 &wrapper,
285 &request_body,
286 first_event_at,
287 )),
288 Event::Assistant(event) => {
289 to_upload
290 .assistant_events
291 .push(AssistantEventRow::from_event(
292 event.clone(),
293 &wrapper,
294 &request_body,
295 first_event_at,
296 ))
297 }
298 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
299 event.clone(),
300 &wrapper,
301 &request_body,
302 first_event_at,
303 )),
304 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
305 event.clone(),
306 &wrapper,
307 &request_body,
308 first_event_at,
309 )),
310 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
311 event.clone(),
312 &wrapper,
313 &request_body,
314 first_event_at,
315 )),
316 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
317 event.clone(),
318 &wrapper,
319 &request_body,
320 first_event_at,
321 )),
322 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
323 event.clone(),
324 &wrapper,
325 &request_body,
326 first_event_at,
327 )),
328 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
329 event.clone(),
330 &wrapper,
331 &request_body,
332 first_event_at,
333 )),
334 }
335 }
336
337 to_upload
338 .upload(&clickhouse_client)
339 .await
340 .map_err(|err| Error::Internal(anyhow!(err)))?;
341
342 Ok(())
343}
344
345#[derive(Default)]
346struct ToUpload {
347 editor_events: Vec<EditorEventRow>,
348 copilot_events: Vec<CopilotEventRow>,
349 assistant_events: Vec<AssistantEventRow>,
350 call_events: Vec<CallEventRow>,
351 cpu_events: Vec<CpuEventRow>,
352 memory_events: Vec<MemoryEventRow>,
353 app_events: Vec<AppEventRow>,
354 setting_events: Vec<SettingEventRow>,
355 edit_events: Vec<EditEventRow>,
356 action_events: Vec<ActionEventRow>,
357}
358
359impl ToUpload {
360 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
361 const EDITOR_EVENTS_TABLE: &str = "editor_events";
362 Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
363 .await
364 .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
365
366 const COPILOT_EVENTS_TABLE: &str = "copilot_events";
367 Self::upload_to_table(
368 COPILOT_EVENTS_TABLE,
369 &self.copilot_events,
370 clickhouse_client,
371 )
372 .await
373 .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
374
375 const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
376 Self::upload_to_table(
377 ASSISTANT_EVENTS_TABLE,
378 &self.assistant_events,
379 clickhouse_client,
380 )
381 .await
382 .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
383
384 const CALL_EVENTS_TABLE: &str = "call_events";
385 Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
386 .await
387 .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
388
389 const CPU_EVENTS_TABLE: &str = "cpu_events";
390 Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
391 .await
392 .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
393
394 const MEMORY_EVENTS_TABLE: &str = "memory_events";
395 Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
396 .await
397 .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
398
399 const APP_EVENTS_TABLE: &str = "app_events";
400 Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
401 .await
402 .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
403
404 const SETTING_EVENTS_TABLE: &str = "setting_events";
405 Self::upload_to_table(
406 SETTING_EVENTS_TABLE,
407 &self.setting_events,
408 clickhouse_client,
409 )
410 .await
411 .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
412
413 const EDIT_EVENTS_TABLE: &str = "edit_events";
414 Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
415 .await
416 .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
417
418 const ACTION_EVENTS_TABLE: &str = "action_events";
419 Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
420 .await
421 .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
422
423 Ok(())
424 }
425
426 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
427 table: &str,
428 rows: &[T],
429 clickhouse_client: &clickhouse::Client,
430 ) -> anyhow::Result<()> {
431 if !rows.is_empty() {
432 let mut insert = clickhouse_client.insert(table)?;
433
434 for event in rows {
435 insert.write(event).await?;
436 }
437
438 insert.end().await?;
439 }
440
441 Ok(())
442 }
443}
444
445pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
446where
447 S: Serializer,
448{
449 if country_code.len() != 2 {
450 use serde::ser::Error;
451 return Err(S::Error::custom(
452 "country_code must be exactly 2 characters",
453 ));
454 }
455
456 let country_code = country_code.as_bytes();
457
458 serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
459}
460
461#[derive(Serialize, Debug, clickhouse::Row)]
462pub struct EditorEventRow {
463 pub installation_id: String,
464 pub operation: String,
465 pub app_version: String,
466 pub file_extension: String,
467 pub os_name: String,
468 pub os_version: String,
469 pub release_channel: String,
470 pub signed_in: bool,
471 pub vim_mode: bool,
472 #[serde(serialize_with = "serialize_country_code")]
473 pub country_code: String,
474 pub region_code: String,
475 pub city: String,
476 pub time: i64,
477 pub copilot_enabled: bool,
478 pub copilot_enabled_for_language: bool,
479 pub historical_event: bool,
480 pub architecture: String,
481 pub is_staff: Option<bool>,
482 pub session_id: Option<String>,
483 pub major: Option<i32>,
484 pub minor: Option<i32>,
485 pub patch: Option<i32>,
486}
487
488impl EditorEventRow {
489 fn from_event(
490 event: EditorEvent,
491 wrapper: &EventWrapper,
492 body: &EventRequestBody,
493 first_event_at: chrono::DateTime<chrono::Utc>,
494 country_code: Option<String>,
495 ) -> Self {
496 let semver = body.semver();
497 let time =
498 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
499
500 Self {
501 app_version: body.app_version.clone(),
502 major: semver.map(|s| s.major as i32),
503 minor: semver.map(|s| s.minor as i32),
504 patch: semver.map(|s| s.patch as i32),
505 release_channel: body.release_channel.clone().unwrap_or_default(),
506 os_name: body.os_name.clone(),
507 os_version: body.os_version.clone().unwrap_or_default(),
508 architecture: body.architecture.clone(),
509 installation_id: body.installation_id.clone().unwrap_or_default(),
510 session_id: body.session_id.clone(),
511 is_staff: body.is_staff,
512 time: time.timestamp_millis(),
513 operation: event.operation,
514 file_extension: event.file_extension.unwrap_or_default(),
515 signed_in: wrapper.signed_in,
516 vim_mode: event.vim_mode,
517 copilot_enabled: event.copilot_enabled,
518 copilot_enabled_for_language: event.copilot_enabled_for_language,
519 country_code: country_code.unwrap_or("XX".to_string()),
520 region_code: "".to_string(),
521 city: "".to_string(),
522 historical_event: false,
523 }
524 }
525}
526
527#[derive(Serialize, Debug, clickhouse::Row)]
528pub struct CopilotEventRow {
529 pub installation_id: String,
530 pub suggestion_id: String,
531 pub suggestion_accepted: bool,
532 pub app_version: String,
533 pub file_extension: String,
534 pub os_name: String,
535 pub os_version: String,
536 pub release_channel: String,
537 pub signed_in: bool,
538 #[serde(serialize_with = "serialize_country_code")]
539 pub country_code: String,
540 pub region_code: String,
541 pub city: String,
542 pub time: i64,
543 pub is_staff: Option<bool>,
544 pub session_id: Option<String>,
545 pub major: Option<i32>,
546 pub minor: Option<i32>,
547 pub patch: Option<i32>,
548}
549
550impl CopilotEventRow {
551 fn from_event(
552 event: CopilotEvent,
553 wrapper: &EventWrapper,
554 body: &EventRequestBody,
555 first_event_at: chrono::DateTime<chrono::Utc>,
556 country_code: Option<String>,
557 ) -> Self {
558 let semver = body.semver();
559 let time =
560 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
561
562 Self {
563 app_version: body.app_version.clone(),
564 major: semver.map(|s| s.major as i32),
565 minor: semver.map(|s| s.minor as i32),
566 patch: semver.map(|s| s.patch as i32),
567 release_channel: body.release_channel.clone().unwrap_or_default(),
568 os_name: body.os_name.clone(),
569 os_version: body.os_version.clone().unwrap_or_default(),
570 installation_id: body.installation_id.clone().unwrap_or_default(),
571 session_id: body.session_id.clone(),
572 is_staff: body.is_staff,
573 time: time.timestamp_millis(),
574 file_extension: event.file_extension.unwrap_or_default(),
575 signed_in: wrapper.signed_in,
576 country_code: country_code.unwrap_or("XX".to_string()),
577 region_code: "".to_string(),
578 city: "".to_string(),
579 suggestion_id: event.suggestion_id.unwrap_or_default(),
580 suggestion_accepted: event.suggestion_accepted,
581 }
582 }
583}
584
585#[derive(Serialize, Debug, clickhouse::Row)]
586pub struct CallEventRow {
587 // AppInfoBase
588 app_version: String,
589 major: Option<i32>,
590 minor: Option<i32>,
591 patch: Option<i32>,
592 release_channel: String,
593
594 // ClientEventBase
595 installation_id: String,
596 session_id: Option<String>,
597 is_staff: Option<bool>,
598 time: i64,
599
600 // CallEventRow
601 operation: String,
602 room_id: Option<u64>,
603 channel_id: Option<u64>,
604}
605
606impl CallEventRow {
607 fn from_event(
608 event: CallEvent,
609 wrapper: &EventWrapper,
610 body: &EventRequestBody,
611 first_event_at: chrono::DateTime<chrono::Utc>,
612 ) -> Self {
613 let semver = body.semver();
614 let time =
615 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
616
617 Self {
618 app_version: body.app_version.clone(),
619 major: semver.map(|s| s.major as i32),
620 minor: semver.map(|s| s.minor as i32),
621 patch: semver.map(|s| s.patch as i32),
622 release_channel: body.release_channel.clone().unwrap_or_default(),
623 installation_id: body.installation_id.clone().unwrap_or_default(),
624 session_id: body.session_id.clone(),
625 is_staff: body.is_staff,
626 time: time.timestamp_millis(),
627 operation: event.operation,
628 room_id: event.room_id,
629 channel_id: event.channel_id,
630 }
631 }
632}
633
634#[derive(Serialize, Debug, clickhouse::Row)]
635pub struct AssistantEventRow {
636 // AppInfoBase
637 app_version: String,
638 major: Option<i32>,
639 minor: Option<i32>,
640 patch: Option<i32>,
641 release_channel: String,
642
643 // ClientEventBase
644 installation_id: Option<String>,
645 session_id: Option<String>,
646 is_staff: Option<bool>,
647 time: i64,
648
649 // AssistantEventRow
650 conversation_id: String,
651 kind: String,
652 model: String,
653}
654
655impl AssistantEventRow {
656 fn from_event(
657 event: AssistantEvent,
658 wrapper: &EventWrapper,
659 body: &EventRequestBody,
660 first_event_at: chrono::DateTime<chrono::Utc>,
661 ) -> Self {
662 let semver = body.semver();
663 let time =
664 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
665
666 Self {
667 app_version: body.app_version.clone(),
668 major: semver.map(|s| s.major as i32),
669 minor: semver.map(|s| s.minor as i32),
670 patch: semver.map(|s| s.patch as i32),
671 release_channel: body.release_channel.clone().unwrap_or_default(),
672 installation_id: body.installation_id.clone(),
673 session_id: body.session_id.clone(),
674 is_staff: body.is_staff,
675 time: time.timestamp_millis(),
676 conversation_id: event.conversation_id.unwrap_or_default(),
677 kind: event.kind.to_string(),
678 model: event.model,
679 }
680 }
681}
682
683#[derive(Debug, clickhouse::Row, Serialize)]
684pub struct CpuEventRow {
685 pub installation_id: Option<String>,
686 pub is_staff: Option<bool>,
687 pub usage_as_percentage: f32,
688 pub core_count: u32,
689 pub app_version: String,
690 pub release_channel: String,
691 pub time: i64,
692 pub session_id: Option<String>,
693 // pub normalized_cpu_usage: f64, MATERIALIZED
694 pub major: Option<i32>,
695 pub minor: Option<i32>,
696 pub patch: Option<i32>,
697}
698
699impl CpuEventRow {
700 fn from_event(
701 event: CpuEvent,
702 wrapper: &EventWrapper,
703 body: &EventRequestBody,
704 first_event_at: chrono::DateTime<chrono::Utc>,
705 ) -> Self {
706 let semver = body.semver();
707 let time =
708 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
709
710 Self {
711 app_version: body.app_version.clone(),
712 major: semver.map(|s| s.major as i32),
713 minor: semver.map(|s| s.minor as i32),
714 patch: semver.map(|s| s.patch as i32),
715 release_channel: body.release_channel.clone().unwrap_or_default(),
716 installation_id: body.installation_id.clone(),
717 session_id: body.session_id.clone(),
718 is_staff: body.is_staff,
719 time: time.timestamp_millis(),
720 usage_as_percentage: event.usage_as_percentage,
721 core_count: event.core_count,
722 }
723 }
724}
725
726#[derive(Serialize, Debug, clickhouse::Row)]
727pub struct MemoryEventRow {
728 // AppInfoBase
729 app_version: String,
730 major: Option<i32>,
731 minor: Option<i32>,
732 patch: Option<i32>,
733 release_channel: String,
734
735 // ClientEventBase
736 installation_id: Option<String>,
737 session_id: Option<String>,
738 is_staff: Option<bool>,
739 time: i64,
740
741 // MemoryEventRow
742 memory_in_bytes: u64,
743 virtual_memory_in_bytes: u64,
744}
745
746impl MemoryEventRow {
747 fn from_event(
748 event: MemoryEvent,
749 wrapper: &EventWrapper,
750 body: &EventRequestBody,
751 first_event_at: chrono::DateTime<chrono::Utc>,
752 ) -> Self {
753 let semver = body.semver();
754 let time =
755 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
756
757 Self {
758 app_version: body.app_version.clone(),
759 major: semver.map(|s| s.major as i32),
760 minor: semver.map(|s| s.minor as i32),
761 patch: semver.map(|s| s.patch as i32),
762 release_channel: body.release_channel.clone().unwrap_or_default(),
763 installation_id: body.installation_id.clone(),
764 session_id: body.session_id.clone(),
765 is_staff: body.is_staff,
766 time: time.timestamp_millis(),
767 memory_in_bytes: event.memory_in_bytes,
768 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
769 }
770 }
771}
772
773#[derive(Serialize, Debug, clickhouse::Row)]
774pub struct AppEventRow {
775 // AppInfoBase
776 app_version: String,
777 major: Option<i32>,
778 minor: Option<i32>,
779 patch: Option<i32>,
780 release_channel: String,
781
782 // ClientEventBase
783 installation_id: Option<String>,
784 session_id: Option<String>,
785 is_staff: Option<bool>,
786 time: i64,
787
788 // AppEventRow
789 operation: String,
790}
791
792impl AppEventRow {
793 fn from_event(
794 event: AppEvent,
795 wrapper: &EventWrapper,
796 body: &EventRequestBody,
797 first_event_at: chrono::DateTime<chrono::Utc>,
798 ) -> Self {
799 let semver = body.semver();
800 let time =
801 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
802
803 Self {
804 app_version: body.app_version.clone(),
805 major: semver.map(|s| s.major as i32),
806 minor: semver.map(|s| s.minor as i32),
807 patch: semver.map(|s| s.patch as i32),
808 release_channel: body.release_channel.clone().unwrap_or_default(),
809 installation_id: body.installation_id.clone(),
810 session_id: body.session_id.clone(),
811 is_staff: body.is_staff,
812 time: time.timestamp_millis(),
813 operation: event.operation,
814 }
815 }
816}
817
818#[derive(Serialize, Debug, clickhouse::Row)]
819pub struct SettingEventRow {
820 // AppInfoBase
821 app_version: String,
822 major: Option<i32>,
823 minor: Option<i32>,
824 patch: Option<i32>,
825 release_channel: String,
826
827 // ClientEventBase
828 installation_id: Option<String>,
829 session_id: Option<String>,
830 is_staff: Option<bool>,
831 time: i64,
832 // SettingEventRow
833 setting: String,
834 value: String,
835}
836
837impl SettingEventRow {
838 fn from_event(
839 event: SettingEvent,
840 wrapper: &EventWrapper,
841 body: &EventRequestBody,
842 first_event_at: chrono::DateTime<chrono::Utc>,
843 ) -> Self {
844 let semver = body.semver();
845 let time =
846 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
847
848 Self {
849 app_version: body.app_version.clone(),
850 major: semver.map(|s| s.major as i32),
851 minor: semver.map(|s| s.minor as i32),
852 patch: semver.map(|s| s.patch as i32),
853 release_channel: body.release_channel.clone().unwrap_or_default(),
854 installation_id: body.installation_id.clone(),
855 session_id: body.session_id.clone(),
856 is_staff: body.is_staff,
857 time: time.timestamp_millis(),
858 setting: event.setting,
859 value: event.value,
860 }
861 }
862}
863
864#[derive(Serialize, Debug, clickhouse::Row)]
865pub struct EditEventRow {
866 // AppInfoBase
867 app_version: String,
868 major: Option<i32>,
869 minor: Option<i32>,
870 patch: Option<i32>,
871 release_channel: String,
872
873 // ClientEventBase
874 installation_id: Option<String>,
875 // Note: This column name has a typo in the ClickHouse table.
876 #[serde(rename = "sesssion_id")]
877 session_id: Option<String>,
878 is_staff: Option<bool>,
879 time: i64,
880
881 // EditEventRow
882 period_start: i64,
883 period_end: i64,
884 environment: String,
885}
886
887impl EditEventRow {
888 fn from_event(
889 event: EditEvent,
890 wrapper: &EventWrapper,
891 body: &EventRequestBody,
892 first_event_at: chrono::DateTime<chrono::Utc>,
893 ) -> Self {
894 let semver = body.semver();
895 let time =
896 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
897
898 let period_start = time - chrono::Duration::milliseconds(event.duration);
899 let period_end = time;
900
901 Self {
902 app_version: body.app_version.clone(),
903 major: semver.map(|s| s.major as i32),
904 minor: semver.map(|s| s.minor as i32),
905 patch: semver.map(|s| s.patch as i32),
906 release_channel: body.release_channel.clone().unwrap_or_default(),
907 installation_id: body.installation_id.clone(),
908 session_id: body.session_id.clone(),
909 is_staff: body.is_staff,
910 time: time.timestamp_millis(),
911 period_start: period_start.timestamp_millis(),
912 period_end: period_end.timestamp_millis(),
913 environment: event.environment,
914 }
915 }
916}
917
918#[derive(Serialize, Debug, clickhouse::Row)]
919pub struct ActionEventRow {
920 // AppInfoBase
921 app_version: String,
922 major: Option<i32>,
923 minor: Option<i32>,
924 patch: Option<i32>,
925 release_channel: String,
926
927 // ClientEventBase
928 installation_id: Option<String>,
929 // Note: This column name has a typo in the ClickHouse table.
930 #[serde(rename = "sesssion_id")]
931 session_id: Option<String>,
932 is_staff: Option<bool>,
933 time: i64,
934 // ActionEventRow
935 source: String,
936 action: String,
937}
938
939impl ActionEventRow {
940 fn from_event(
941 event: ActionEvent,
942 wrapper: &EventWrapper,
943 body: &EventRequestBody,
944 first_event_at: chrono::DateTime<chrono::Utc>,
945 ) -> Self {
946 let semver = body.semver();
947 let time =
948 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
949
950 Self {
951 app_version: body.app_version.clone(),
952 major: semver.map(|s| s.major as i32),
953 minor: semver.map(|s| s.minor as i32),
954 patch: semver.map(|s| s.patch as i32),
955 release_channel: body.release_channel.clone().unwrap_or_default(),
956 installation_id: body.installation_id.clone(),
957 session_id: body.session_id.clone(),
958 is_staff: body.is_staff,
959 time: time.timestamp_millis(),
960 source: event.source,
961 action: event.action,
962 }
963 }
964}