1use super::ips_file::IpsFile;
2use crate::{api::slack, AppState, Error, Result};
3use anyhow::{anyhow, Context};
4use aws_sdk_s3::primitives::ByteStream;
5use axum::{
6 body::Bytes,
7 headers::Header,
8 http::{HeaderMap, HeaderName, StatusCode},
9 routing::post,
10 Extension, Router, TypedHeader,
11};
12use rpc::ExtensionMetadata;
13use serde::{Serialize, Serializer};
14use sha2::{Digest, Sha256};
15use std::sync::{Arc, OnceLock};
16use telemetry_events::{
17 ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
18 EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent, SettingEvent,
19};
20use util::SemanticVersion;
21
22pub fn router() -> Router {
23 Router::new()
24 .route("/telemetry/events", post(post_events))
25 .route("/telemetry/crashes", post(post_crash))
26}
27
28pub struct ZedChecksumHeader(Vec<u8>);
29
30impl Header for ZedChecksumHeader {
31 fn name() -> &'static HeaderName {
32 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
33 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
34 }
35
36 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
37 where
38 Self: Sized,
39 I: Iterator<Item = &'i axum::http::HeaderValue>,
40 {
41 let checksum = values
42 .next()
43 .ok_or_else(axum::headers::Error::invalid)?
44 .to_str()
45 .map_err(|_| axum::headers::Error::invalid())?;
46
47 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
48 Ok(Self(bytes))
49 }
50
51 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
52 unimplemented!()
53 }
54}
55
56pub struct CloudflareIpCountryHeader(String);
57
58impl Header for CloudflareIpCountryHeader {
59 fn name() -> &'static HeaderName {
60 static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
61 CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
62 }
63
64 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
65 where
66 Self: Sized,
67 I: Iterator<Item = &'i axum::http::HeaderValue>,
68 {
69 let country_code = values
70 .next()
71 .ok_or_else(axum::headers::Error::invalid)?
72 .to_str()
73 .map_err(|_| axum::headers::Error::invalid())?;
74
75 Ok(Self(country_code.to_string()))
76 }
77
78 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
79 unimplemented!()
80 }
81}
82
83pub async fn post_crash(
84 Extension(app): Extension<Arc<AppState>>,
85 headers: HeaderMap,
86 body: Bytes,
87) -> Result<()> {
88 static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
89
90 let report = IpsFile::parse(&body)?;
91 let version_threshold = SemanticVersion::new(0, 123, 0);
92
93 let bundle_id = &report.header.bundle_id;
94 let app_version = &report.app_version();
95
96 if bundle_id == "dev.zed.Zed-Dev" {
97 log::error!("Crash uploads from {} are ignored.", bundle_id);
98 return Ok(());
99 }
100
101 if app_version.is_none() || app_version.unwrap() < version_threshold {
102 log::error!(
103 "Crash uploads from {} are ignored.",
104 report.header.app_version
105 );
106 return Ok(());
107 }
108 let app_version = app_version.unwrap();
109
110 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
111 let response = blob_store_client
112 .head_object()
113 .bucket(CRASH_REPORTS_BUCKET)
114 .key(report.header.incident_id.clone() + ".ips")
115 .send()
116 .await;
117
118 if response.is_ok() {
119 log::info!("We've already uploaded this crash");
120 return Ok(());
121 }
122
123 blob_store_client
124 .put_object()
125 .bucket(CRASH_REPORTS_BUCKET)
126 .key(report.header.incident_id.clone() + ".ips")
127 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
128 .body(ByteStream::from(body.to_vec()))
129 .send()
130 .await
131 .map_err(|e| log::error!("Failed to upload crash: {}", e))
132 .ok();
133 }
134
135 let recent_panic_on: Option<i64> = headers
136 .get("x-zed-panicked-on")
137 .and_then(|h| h.to_str().ok())
138 .and_then(|s| s.parse().ok());
139 let mut recent_panic = None;
140
141 if let Some(recent_panic_on) = recent_panic_on {
142 let crashed_at = match report.timestamp() {
143 Ok(t) => Some(t),
144 Err(e) => {
145 log::error!("Can't parse {}: {}", report.header.timestamp, e);
146 None
147 }
148 };
149 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
150 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
151 }
152 }
153
154 let description = report.description(recent_panic);
155 let summary = report.backtrace_summary();
156
157 tracing::error!(
158 service = "client",
159 version = %report.header.app_version,
160 os_version = %report.header.os_version,
161 bundle_id = %report.header.bundle_id,
162 incident_id = %report.header.incident_id,
163 description = %description,
164 backtrace = %summary,
165 "crash report");
166
167 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
168 let payload = slack::WebhookBody::new(|w| {
169 w.add_section(|s| s.text(slack::Text::markdown(description)))
170 .add_section(|s| {
171 s.add_field(slack::Text::markdown(format!(
172 "*Version:*\n{} ({})",
173 bundle_id, app_version
174 )))
175 .add_field({
176 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
177 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
178 hostname.strip_prefix("http://").unwrap_or_default()
179 });
180
181 slack::Text::markdown(format!(
182 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
183 CRASH_REPORTS_BUCKET,
184 hostname,
185 report.header.incident_id,
186 report
187 .header
188 .incident_id
189 .chars()
190 .take(8)
191 .collect::<String>(),
192 ))
193 })
194 })
195 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
196 });
197 let payload_json = serde_json::to_string(&payload).map_err(|err| {
198 log::error!("Failed to serialize payload to JSON: {err}");
199 Error::Internal(anyhow!(err))
200 })?;
201
202 reqwest::Client::new()
203 .post(slack_panics_webhook)
204 .header("Content-Type", "application/json")
205 .body(payload_json)
206 .send()
207 .await
208 .map_err(|err| {
209 log::error!("Failed to send payload to Slack: {err}");
210 Error::Internal(anyhow!(err))
211 })?;
212 }
213
214 Ok(())
215}
216
217pub async fn post_events(
218 Extension(app): Extension<Arc<AppState>>,
219 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
220 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
221 body: Bytes,
222) -> Result<()> {
223 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
224 Err(Error::Http(
225 StatusCode::NOT_IMPLEMENTED,
226 "not supported".into(),
227 ))?
228 };
229
230 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
231 return Err(Error::Http(
232 StatusCode::INTERNAL_SERVER_ERROR,
233 "events not enabled".into(),
234 ))?;
235 };
236
237 let mut summer = Sha256::new();
238 summer.update(checksum_seed);
239 summer.update(&body);
240 summer.update(checksum_seed);
241
242 if &checksum != &summer.finalize()[..] {
243 return Err(Error::Http(
244 StatusCode::BAD_REQUEST,
245 "invalid checksum".into(),
246 ))?;
247 }
248
249 let request_body: telemetry_events::EventRequestBody =
250 serde_json::from_slice(&body).map_err(|err| {
251 log::error!("can't parse event json: {err}");
252 Error::Internal(anyhow!(err))
253 })?;
254
255 let mut to_upload = ToUpload::default();
256 let Some(last_event) = request_body.events.last() else {
257 return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
258 };
259 let country_code = country_code_header.map(|h| h.0 .0);
260
261 let first_event_at = chrono::Utc::now()
262 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
263
264 for wrapper in &request_body.events {
265 match &wrapper.event {
266 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
267 event.clone(),
268 &wrapper,
269 &request_body,
270 first_event_at,
271 country_code.clone(),
272 )),
273 Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
274 event.clone(),
275 &wrapper,
276 &request_body,
277 first_event_at,
278 country_code.clone(),
279 )),
280 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
281 event.clone(),
282 &wrapper,
283 &request_body,
284 first_event_at,
285 )),
286 Event::Assistant(event) => {
287 to_upload
288 .assistant_events
289 .push(AssistantEventRow::from_event(
290 event.clone(),
291 &wrapper,
292 &request_body,
293 first_event_at,
294 ))
295 }
296 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
297 event.clone(),
298 &wrapper,
299 &request_body,
300 first_event_at,
301 )),
302 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
303 event.clone(),
304 &wrapper,
305 &request_body,
306 first_event_at,
307 )),
308 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
309 event.clone(),
310 &wrapper,
311 &request_body,
312 first_event_at,
313 )),
314 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
315 event.clone(),
316 &wrapper,
317 &request_body,
318 first_event_at,
319 )),
320 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
321 event.clone(),
322 &wrapper,
323 &request_body,
324 first_event_at,
325 )),
326 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
327 event.clone(),
328 &wrapper,
329 &request_body,
330 first_event_at,
331 )),
332 Event::Extension(event) => {
333 let metadata = app
334 .db
335 .get_extension_version(&event.extension_id, &event.version)
336 .await?;
337 to_upload
338 .extension_events
339 .push(ExtensionEventRow::from_event(
340 event.clone(),
341 &wrapper,
342 &request_body,
343 metadata,
344 first_event_at,
345 ))
346 }
347 }
348 }
349
350 to_upload
351 .upload(&clickhouse_client)
352 .await
353 .map_err(|err| Error::Internal(anyhow!(err)))?;
354
355 Ok(())
356}
357
358#[derive(Default)]
359struct ToUpload {
360 editor_events: Vec<EditorEventRow>,
361 copilot_events: Vec<CopilotEventRow>,
362 assistant_events: Vec<AssistantEventRow>,
363 call_events: Vec<CallEventRow>,
364 cpu_events: Vec<CpuEventRow>,
365 memory_events: Vec<MemoryEventRow>,
366 app_events: Vec<AppEventRow>,
367 setting_events: Vec<SettingEventRow>,
368 extension_events: Vec<ExtensionEventRow>,
369 edit_events: Vec<EditEventRow>,
370 action_events: Vec<ActionEventRow>,
371}
372
373impl ToUpload {
374 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
375 const EDITOR_EVENTS_TABLE: &str = "editor_events";
376 Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
377 .await
378 .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
379
380 const COPILOT_EVENTS_TABLE: &str = "copilot_events";
381 Self::upload_to_table(
382 COPILOT_EVENTS_TABLE,
383 &self.copilot_events,
384 clickhouse_client,
385 )
386 .await
387 .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
388
389 const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
390 Self::upload_to_table(
391 ASSISTANT_EVENTS_TABLE,
392 &self.assistant_events,
393 clickhouse_client,
394 )
395 .await
396 .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
397
398 const CALL_EVENTS_TABLE: &str = "call_events";
399 Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
400 .await
401 .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
402
403 const CPU_EVENTS_TABLE: &str = "cpu_events";
404 Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
405 .await
406 .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
407
408 const MEMORY_EVENTS_TABLE: &str = "memory_events";
409 Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
410 .await
411 .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
412
413 const APP_EVENTS_TABLE: &str = "app_events";
414 Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
415 .await
416 .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
417
418 const SETTING_EVENTS_TABLE: &str = "setting_events";
419 Self::upload_to_table(
420 SETTING_EVENTS_TABLE,
421 &self.setting_events,
422 clickhouse_client,
423 )
424 .await
425 .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
426
427 const EXTENSION_EVENTS_TABLE: &str = "extension_events";
428 Self::upload_to_table(
429 EXTENSION_EVENTS_TABLE,
430 &self.extension_events,
431 clickhouse_client,
432 )
433 .await
434 .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
435
436 const EDIT_EVENTS_TABLE: &str = "edit_events";
437 Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
438 .await
439 .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
440
441 const ACTION_EVENTS_TABLE: &str = "action_events";
442 Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
443 .await
444 .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
445
446 Ok(())
447 }
448
449 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
450 table: &str,
451 rows: &[T],
452 clickhouse_client: &clickhouse::Client,
453 ) -> anyhow::Result<()> {
454 if !rows.is_empty() {
455 let mut insert = clickhouse_client.insert(table)?;
456
457 for event in rows {
458 insert.write(event).await?;
459 }
460
461 insert.end().await?;
462 }
463
464 Ok(())
465 }
466}
467
468pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
469where
470 S: Serializer,
471{
472 if country_code.len() != 2 {
473 use serde::ser::Error;
474 return Err(S::Error::custom(
475 "country_code must be exactly 2 characters",
476 ));
477 }
478
479 let country_code = country_code.as_bytes();
480
481 serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
482}
483
484#[derive(Serialize, Debug, clickhouse::Row)]
485pub struct EditorEventRow {
486 pub installation_id: String,
487 pub operation: String,
488 pub app_version: String,
489 pub file_extension: String,
490 pub os_name: String,
491 pub os_version: String,
492 pub release_channel: String,
493 pub signed_in: bool,
494 pub vim_mode: bool,
495 #[serde(serialize_with = "serialize_country_code")]
496 pub country_code: String,
497 pub region_code: String,
498 pub city: String,
499 pub time: i64,
500 pub copilot_enabled: bool,
501 pub copilot_enabled_for_language: bool,
502 pub historical_event: bool,
503 pub architecture: String,
504 pub is_staff: Option<bool>,
505 pub session_id: Option<String>,
506 pub major: Option<i32>,
507 pub minor: Option<i32>,
508 pub patch: Option<i32>,
509}
510
511impl EditorEventRow {
512 fn from_event(
513 event: EditorEvent,
514 wrapper: &EventWrapper,
515 body: &EventRequestBody,
516 first_event_at: chrono::DateTime<chrono::Utc>,
517 country_code: Option<String>,
518 ) -> Self {
519 let semver = body.semver();
520 let time =
521 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
522
523 Self {
524 app_version: body.app_version.clone(),
525 major: semver.map(|s| s.major as i32),
526 minor: semver.map(|s| s.minor as i32),
527 patch: semver.map(|s| s.patch as i32),
528 release_channel: body.release_channel.clone().unwrap_or_default(),
529 os_name: body.os_name.clone(),
530 os_version: body.os_version.clone().unwrap_or_default(),
531 architecture: body.architecture.clone(),
532 installation_id: body.installation_id.clone().unwrap_or_default(),
533 session_id: body.session_id.clone(),
534 is_staff: body.is_staff,
535 time: time.timestamp_millis(),
536 operation: event.operation,
537 file_extension: event.file_extension.unwrap_or_default(),
538 signed_in: wrapper.signed_in,
539 vim_mode: event.vim_mode,
540 copilot_enabled: event.copilot_enabled,
541 copilot_enabled_for_language: event.copilot_enabled_for_language,
542 country_code: country_code.unwrap_or("XX".to_string()),
543 region_code: "".to_string(),
544 city: "".to_string(),
545 historical_event: false,
546 }
547 }
548}
549
550#[derive(Serialize, Debug, clickhouse::Row)]
551pub struct CopilotEventRow {
552 pub installation_id: String,
553 pub suggestion_id: String,
554 pub suggestion_accepted: bool,
555 pub app_version: String,
556 pub file_extension: String,
557 pub os_name: String,
558 pub os_version: String,
559 pub release_channel: String,
560 pub signed_in: bool,
561 #[serde(serialize_with = "serialize_country_code")]
562 pub country_code: String,
563 pub region_code: String,
564 pub city: String,
565 pub time: i64,
566 pub is_staff: Option<bool>,
567 pub session_id: Option<String>,
568 pub major: Option<i32>,
569 pub minor: Option<i32>,
570 pub patch: Option<i32>,
571}
572
573impl CopilotEventRow {
574 fn from_event(
575 event: CopilotEvent,
576 wrapper: &EventWrapper,
577 body: &EventRequestBody,
578 first_event_at: chrono::DateTime<chrono::Utc>,
579 country_code: Option<String>,
580 ) -> Self {
581 let semver = body.semver();
582 let time =
583 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
584
585 Self {
586 app_version: body.app_version.clone(),
587 major: semver.map(|s| s.major as i32),
588 minor: semver.map(|s| s.minor as i32),
589 patch: semver.map(|s| s.patch as i32),
590 release_channel: body.release_channel.clone().unwrap_or_default(),
591 os_name: body.os_name.clone(),
592 os_version: body.os_version.clone().unwrap_or_default(),
593 installation_id: body.installation_id.clone().unwrap_or_default(),
594 session_id: body.session_id.clone(),
595 is_staff: body.is_staff,
596 time: time.timestamp_millis(),
597 file_extension: event.file_extension.unwrap_or_default(),
598 signed_in: wrapper.signed_in,
599 country_code: country_code.unwrap_or("XX".to_string()),
600 region_code: "".to_string(),
601 city: "".to_string(),
602 suggestion_id: event.suggestion_id.unwrap_or_default(),
603 suggestion_accepted: event.suggestion_accepted,
604 }
605 }
606}
607
608#[derive(Serialize, Debug, clickhouse::Row)]
609pub struct CallEventRow {
610 // AppInfoBase
611 app_version: String,
612 major: Option<i32>,
613 minor: Option<i32>,
614 patch: Option<i32>,
615 release_channel: String,
616
617 // ClientEventBase
618 installation_id: String,
619 session_id: Option<String>,
620 is_staff: Option<bool>,
621 time: i64,
622
623 // CallEventRow
624 operation: String,
625 room_id: Option<u64>,
626 channel_id: Option<u64>,
627}
628
629impl CallEventRow {
630 fn from_event(
631 event: CallEvent,
632 wrapper: &EventWrapper,
633 body: &EventRequestBody,
634 first_event_at: chrono::DateTime<chrono::Utc>,
635 ) -> Self {
636 let semver = body.semver();
637 let time =
638 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
639
640 Self {
641 app_version: body.app_version.clone(),
642 major: semver.map(|s| s.major as i32),
643 minor: semver.map(|s| s.minor as i32),
644 patch: semver.map(|s| s.patch as i32),
645 release_channel: body.release_channel.clone().unwrap_or_default(),
646 installation_id: body.installation_id.clone().unwrap_or_default(),
647 session_id: body.session_id.clone(),
648 is_staff: body.is_staff,
649 time: time.timestamp_millis(),
650 operation: event.operation,
651 room_id: event.room_id,
652 channel_id: event.channel_id,
653 }
654 }
655}
656
657#[derive(Serialize, Debug, clickhouse::Row)]
658pub struct AssistantEventRow {
659 // AppInfoBase
660 app_version: String,
661 major: Option<i32>,
662 minor: Option<i32>,
663 patch: Option<i32>,
664 release_channel: String,
665
666 // ClientEventBase
667 installation_id: Option<String>,
668 session_id: Option<String>,
669 is_staff: Option<bool>,
670 time: i64,
671
672 // AssistantEventRow
673 conversation_id: String,
674 kind: String,
675 model: String,
676}
677
678impl AssistantEventRow {
679 fn from_event(
680 event: AssistantEvent,
681 wrapper: &EventWrapper,
682 body: &EventRequestBody,
683 first_event_at: chrono::DateTime<chrono::Utc>,
684 ) -> Self {
685 let semver = body.semver();
686 let time =
687 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
688
689 Self {
690 app_version: body.app_version.clone(),
691 major: semver.map(|s| s.major as i32),
692 minor: semver.map(|s| s.minor as i32),
693 patch: semver.map(|s| s.patch as i32),
694 release_channel: body.release_channel.clone().unwrap_or_default(),
695 installation_id: body.installation_id.clone(),
696 session_id: body.session_id.clone(),
697 is_staff: body.is_staff,
698 time: time.timestamp_millis(),
699 conversation_id: event.conversation_id.unwrap_or_default(),
700 kind: event.kind.to_string(),
701 model: event.model,
702 }
703 }
704}
705
706#[derive(Debug, clickhouse::Row, Serialize)]
707pub struct CpuEventRow {
708 pub installation_id: Option<String>,
709 pub is_staff: Option<bool>,
710 pub usage_as_percentage: f32,
711 pub core_count: u32,
712 pub app_version: String,
713 pub release_channel: String,
714 pub time: i64,
715 pub session_id: Option<String>,
716 // pub normalized_cpu_usage: f64, MATERIALIZED
717 pub major: Option<i32>,
718 pub minor: Option<i32>,
719 pub patch: Option<i32>,
720}
721
722impl CpuEventRow {
723 fn from_event(
724 event: CpuEvent,
725 wrapper: &EventWrapper,
726 body: &EventRequestBody,
727 first_event_at: chrono::DateTime<chrono::Utc>,
728 ) -> Self {
729 let semver = body.semver();
730 let time =
731 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
732
733 Self {
734 app_version: body.app_version.clone(),
735 major: semver.map(|s| s.major as i32),
736 minor: semver.map(|s| s.minor as i32),
737 patch: semver.map(|s| s.patch as i32),
738 release_channel: body.release_channel.clone().unwrap_or_default(),
739 installation_id: body.installation_id.clone(),
740 session_id: body.session_id.clone(),
741 is_staff: body.is_staff,
742 time: time.timestamp_millis(),
743 usage_as_percentage: event.usage_as_percentage,
744 core_count: event.core_count,
745 }
746 }
747}
748
749#[derive(Serialize, Debug, clickhouse::Row)]
750pub struct MemoryEventRow {
751 // AppInfoBase
752 app_version: String,
753 major: Option<i32>,
754 minor: Option<i32>,
755 patch: Option<i32>,
756 release_channel: String,
757
758 // ClientEventBase
759 installation_id: Option<String>,
760 session_id: Option<String>,
761 is_staff: Option<bool>,
762 time: i64,
763
764 // MemoryEventRow
765 memory_in_bytes: u64,
766 virtual_memory_in_bytes: u64,
767}
768
769impl MemoryEventRow {
770 fn from_event(
771 event: MemoryEvent,
772 wrapper: &EventWrapper,
773 body: &EventRequestBody,
774 first_event_at: chrono::DateTime<chrono::Utc>,
775 ) -> Self {
776 let semver = body.semver();
777 let time =
778 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
779
780 Self {
781 app_version: body.app_version.clone(),
782 major: semver.map(|s| s.major as i32),
783 minor: semver.map(|s| s.minor as i32),
784 patch: semver.map(|s| s.patch as i32),
785 release_channel: body.release_channel.clone().unwrap_or_default(),
786 installation_id: body.installation_id.clone(),
787 session_id: body.session_id.clone(),
788 is_staff: body.is_staff,
789 time: time.timestamp_millis(),
790 memory_in_bytes: event.memory_in_bytes,
791 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
792 }
793 }
794}
795
796#[derive(Serialize, Debug, clickhouse::Row)]
797pub struct AppEventRow {
798 // AppInfoBase
799 app_version: String,
800 major: Option<i32>,
801 minor: Option<i32>,
802 patch: Option<i32>,
803 release_channel: String,
804
805 // ClientEventBase
806 installation_id: Option<String>,
807 session_id: Option<String>,
808 is_staff: Option<bool>,
809 time: i64,
810
811 // AppEventRow
812 operation: String,
813}
814
815impl AppEventRow {
816 fn from_event(
817 event: AppEvent,
818 wrapper: &EventWrapper,
819 body: &EventRequestBody,
820 first_event_at: chrono::DateTime<chrono::Utc>,
821 ) -> Self {
822 let semver = body.semver();
823 let time =
824 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
825
826 Self {
827 app_version: body.app_version.clone(),
828 major: semver.map(|s| s.major as i32),
829 minor: semver.map(|s| s.minor as i32),
830 patch: semver.map(|s| s.patch as i32),
831 release_channel: body.release_channel.clone().unwrap_or_default(),
832 installation_id: body.installation_id.clone(),
833 session_id: body.session_id.clone(),
834 is_staff: body.is_staff,
835 time: time.timestamp_millis(),
836 operation: event.operation,
837 }
838 }
839}
840
841#[derive(Serialize, Debug, clickhouse::Row)]
842pub struct SettingEventRow {
843 // AppInfoBase
844 app_version: String,
845 major: Option<i32>,
846 minor: Option<i32>,
847 patch: Option<i32>,
848 release_channel: String,
849
850 // ClientEventBase
851 installation_id: Option<String>,
852 session_id: Option<String>,
853 is_staff: Option<bool>,
854 time: i64,
855 // SettingEventRow
856 setting: String,
857 value: String,
858}
859
860impl SettingEventRow {
861 fn from_event(
862 event: SettingEvent,
863 wrapper: &EventWrapper,
864 body: &EventRequestBody,
865 first_event_at: chrono::DateTime<chrono::Utc>,
866 ) -> Self {
867 let semver = body.semver();
868 let time =
869 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
870
871 Self {
872 app_version: body.app_version.clone(),
873 major: semver.map(|s| s.major as i32),
874 minor: semver.map(|s| s.minor as i32),
875 patch: semver.map(|s| s.patch as i32),
876 release_channel: body.release_channel.clone().unwrap_or_default(),
877 installation_id: body.installation_id.clone(),
878 session_id: body.session_id.clone(),
879 is_staff: body.is_staff,
880 time: time.timestamp_millis(),
881 setting: event.setting,
882 value: event.value,
883 }
884 }
885}
886
887#[derive(Serialize, Debug, clickhouse::Row)]
888pub struct ExtensionEventRow {
889 // AppInfoBase
890 app_version: String,
891 major: Option<i32>,
892 minor: Option<i32>,
893 patch: Option<i32>,
894 release_channel: String,
895
896 // ClientEventBase
897 installation_id: Option<String>,
898 session_id: Option<String>,
899 is_staff: Option<bool>,
900 time: i64,
901
902 // ExtensionEventRow
903 extension_id: Arc<str>,
904 extension_version: Arc<str>,
905 dev: bool,
906 schema_version: Option<i32>,
907 wasm_api_version: Option<String>,
908}
909
910impl ExtensionEventRow {
911 fn from_event(
912 event: ExtensionEvent,
913 wrapper: &EventWrapper,
914 body: &EventRequestBody,
915 extension_metadata: Option<ExtensionMetadata>,
916 first_event_at: chrono::DateTime<chrono::Utc>,
917 ) -> Self {
918 let semver = body.semver();
919 let time =
920 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
921
922 Self {
923 app_version: body.app_version.clone(),
924 major: semver.map(|s| s.major as i32),
925 minor: semver.map(|s| s.minor as i32),
926 patch: semver.map(|s| s.patch as i32),
927 release_channel: body.release_channel.clone().unwrap_or_default(),
928 installation_id: body.installation_id.clone(),
929 session_id: body.session_id.clone(),
930 is_staff: body.is_staff,
931 time: time.timestamp_millis(),
932 extension_id: event.extension_id,
933 extension_version: event.version,
934 dev: extension_metadata.is_none(),
935 schema_version: extension_metadata
936 .as_ref()
937 .and_then(|metadata| metadata.manifest.schema_version),
938 wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
939 metadata
940 .manifest
941 .wasm_api_version
942 .as_ref()
943 .map(|version| version.to_string())
944 }),
945 }
946 }
947}
948
949#[derive(Serialize, Debug, clickhouse::Row)]
950pub struct EditEventRow {
951 // AppInfoBase
952 app_version: String,
953 major: Option<i32>,
954 minor: Option<i32>,
955 patch: Option<i32>,
956 release_channel: String,
957
958 // ClientEventBase
959 installation_id: Option<String>,
960 // Note: This column name has a typo in the ClickHouse table.
961 #[serde(rename = "sesssion_id")]
962 session_id: Option<String>,
963 is_staff: Option<bool>,
964 time: i64,
965
966 // EditEventRow
967 period_start: i64,
968 period_end: i64,
969 environment: String,
970}
971
972impl EditEventRow {
973 fn from_event(
974 event: EditEvent,
975 wrapper: &EventWrapper,
976 body: &EventRequestBody,
977 first_event_at: chrono::DateTime<chrono::Utc>,
978 ) -> Self {
979 let semver = body.semver();
980 let time =
981 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
982
983 let period_start = time - chrono::Duration::milliseconds(event.duration);
984 let period_end = time;
985
986 Self {
987 app_version: body.app_version.clone(),
988 major: semver.map(|s| s.major as i32),
989 minor: semver.map(|s| s.minor as i32),
990 patch: semver.map(|s| s.patch as i32),
991 release_channel: body.release_channel.clone().unwrap_or_default(),
992 installation_id: body.installation_id.clone(),
993 session_id: body.session_id.clone(),
994 is_staff: body.is_staff,
995 time: time.timestamp_millis(),
996 period_start: period_start.timestamp_millis(),
997 period_end: period_end.timestamp_millis(),
998 environment: event.environment,
999 }
1000 }
1001}
1002
1003#[derive(Serialize, Debug, clickhouse::Row)]
1004pub struct ActionEventRow {
1005 // AppInfoBase
1006 app_version: String,
1007 major: Option<i32>,
1008 minor: Option<i32>,
1009 patch: Option<i32>,
1010 release_channel: String,
1011
1012 // ClientEventBase
1013 installation_id: Option<String>,
1014 // Note: This column name has a typo in the ClickHouse table.
1015 #[serde(rename = "sesssion_id")]
1016 session_id: Option<String>,
1017 is_staff: Option<bool>,
1018 time: i64,
1019 // ActionEventRow
1020 source: String,
1021 action: String,
1022}
1023
1024impl ActionEventRow {
1025 fn from_event(
1026 event: ActionEvent,
1027 wrapper: &EventWrapper,
1028 body: &EventRequestBody,
1029 first_event_at: chrono::DateTime<chrono::Utc>,
1030 ) -> Self {
1031 let semver = body.semver();
1032 let time =
1033 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1034
1035 Self {
1036 app_version: body.app_version.clone(),
1037 major: semver.map(|s| s.major as i32),
1038 minor: semver.map(|s| s.minor as i32),
1039 patch: semver.map(|s| s.patch as i32),
1040 release_channel: body.release_channel.clone().unwrap_or_default(),
1041 installation_id: body.installation_id.clone(),
1042 session_id: body.session_id.clone(),
1043 is_staff: body.is_staff,
1044 time: time.timestamp_millis(),
1045 source: event.source,
1046 action: event.action,
1047 }
1048 }
1049}