1use super::ips_file::IpsFile;
2use crate::{api::slack, AppState, Error, Result};
3use anyhow::{anyhow, Context};
4use aws_sdk_s3::primitives::ByteStream;
5use axum::{
6 body::Bytes,
7 headers::Header,
8 http::{HeaderMap, HeaderName, StatusCode},
9 routing::post,
10 Extension, Router, TypedHeader,
11};
12use rpc::ExtensionMetadata;
13use semantic_version::SemanticVersion;
14use serde::{Serialize, Serializer};
15use sha2::{Digest, Sha256};
16use std::sync::{Arc, OnceLock};
17use telemetry_events::{
18 ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
19 EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent, SettingEvent,
20};
21
22pub fn router() -> Router {
23 Router::new()
24 .route("/telemetry/events", post(post_events))
25 .route("/telemetry/crashes", post(post_crash))
26}
27
28pub struct ZedChecksumHeader(Vec<u8>);
29
30impl Header for ZedChecksumHeader {
31 fn name() -> &'static HeaderName {
32 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
33 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
34 }
35
36 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
37 where
38 Self: Sized,
39 I: Iterator<Item = &'i axum::http::HeaderValue>,
40 {
41 let checksum = values
42 .next()
43 .ok_or_else(axum::headers::Error::invalid)?
44 .to_str()
45 .map_err(|_| axum::headers::Error::invalid())?;
46
47 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
48 Ok(Self(bytes))
49 }
50
51 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
52 unimplemented!()
53 }
54}
55
56pub struct CloudflareIpCountryHeader(String);
57
58impl Header for CloudflareIpCountryHeader {
59 fn name() -> &'static HeaderName {
60 static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
61 CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
62 }
63
64 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
65 where
66 Self: Sized,
67 I: Iterator<Item = &'i axum::http::HeaderValue>,
68 {
69 let country_code = values
70 .next()
71 .ok_or_else(axum::headers::Error::invalid)?
72 .to_str()
73 .map_err(|_| axum::headers::Error::invalid())?;
74
75 Ok(Self(country_code.to_string()))
76 }
77
78 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
79 unimplemented!()
80 }
81}
82
83pub async fn post_crash(
84 Extension(app): Extension<Arc<AppState>>,
85 headers: HeaderMap,
86 body: Bytes,
87) -> Result<()> {
88 static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
89
90 let report = IpsFile::parse(&body)?;
91 let version_threshold = SemanticVersion::new(0, 123, 0);
92
93 let bundle_id = &report.header.bundle_id;
94 let app_version = &report.app_version();
95
96 if bundle_id == "dev.zed.Zed-Dev" {
97 log::error!("Crash uploads from {} are ignored.", bundle_id);
98 return Ok(());
99 }
100
101 if app_version.is_none() || app_version.unwrap() < version_threshold {
102 log::error!(
103 "Crash uploads from {} are ignored.",
104 report.header.app_version
105 );
106 return Ok(());
107 }
108 let app_version = app_version.unwrap();
109
110 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
111 let response = blob_store_client
112 .head_object()
113 .bucket(CRASH_REPORTS_BUCKET)
114 .key(report.header.incident_id.clone() + ".ips")
115 .send()
116 .await;
117
118 if response.is_ok() {
119 log::info!("We've already uploaded this crash");
120 return Ok(());
121 }
122
123 blob_store_client
124 .put_object()
125 .bucket(CRASH_REPORTS_BUCKET)
126 .key(report.header.incident_id.clone() + ".ips")
127 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
128 .body(ByteStream::from(body.to_vec()))
129 .send()
130 .await
131 .map_err(|e| log::error!("Failed to upload crash: {}", e))
132 .ok();
133 }
134
135 let recent_panic_on: Option<i64> = headers
136 .get("x-zed-panicked-on")
137 .and_then(|h| h.to_str().ok())
138 .and_then(|s| s.parse().ok());
139 let mut recent_panic = None;
140
141 if let Some(recent_panic_on) = recent_panic_on {
142 let crashed_at = match report.timestamp() {
143 Ok(t) => Some(t),
144 Err(e) => {
145 log::error!("Can't parse {}: {}", report.header.timestamp, e);
146 None
147 }
148 };
149 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
150 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
151 }
152 }
153
154 let description = report.description(recent_panic);
155 let summary = report.backtrace_summary();
156
157 tracing::error!(
158 service = "client",
159 version = %report.header.app_version,
160 os_version = %report.header.os_version,
161 bundle_id = %report.header.bundle_id,
162 incident_id = %report.header.incident_id,
163 description = %description,
164 backtrace = %summary,
165 "crash report");
166
167 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
168 let payload = slack::WebhookBody::new(|w| {
169 w.add_section(|s| s.text(slack::Text::markdown(description)))
170 .add_section(|s| {
171 s.add_field(slack::Text::markdown(format!(
172 "*Version:*\n{} ({})",
173 bundle_id, app_version
174 )))
175 .add_field({
176 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
177 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
178 hostname.strip_prefix("http://").unwrap_or_default()
179 });
180
181 slack::Text::markdown(format!(
182 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
183 CRASH_REPORTS_BUCKET,
184 hostname,
185 report.header.incident_id,
186 report
187 .header
188 .incident_id
189 .chars()
190 .take(8)
191 .collect::<String>(),
192 ))
193 })
194 })
195 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
196 });
197 let payload_json = serde_json::to_string(&payload).map_err(|err| {
198 log::error!("Failed to serialize payload to JSON: {err}");
199 Error::Internal(anyhow!(err))
200 })?;
201
202 reqwest::Client::new()
203 .post(slack_panics_webhook)
204 .header("Content-Type", "application/json")
205 .body(payload_json)
206 .send()
207 .await
208 .map_err(|err| {
209 log::error!("Failed to send payload to Slack: {err}");
210 Error::Internal(anyhow!(err))
211 })?;
212 }
213
214 Ok(())
215}
216
217pub async fn post_events(
218 Extension(app): Extension<Arc<AppState>>,
219 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
220 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
221 body: Bytes,
222) -> Result<()> {
223 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
224 Err(Error::Http(
225 StatusCode::NOT_IMPLEMENTED,
226 "not supported".into(),
227 ))?
228 };
229
230 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
231 return Err(Error::Http(
232 StatusCode::INTERNAL_SERVER_ERROR,
233 "events not enabled".into(),
234 ))?;
235 };
236
237 let mut summer = Sha256::new();
238 summer.update(checksum_seed);
239 summer.update(&body);
240 summer.update(checksum_seed);
241
242 if &checksum != &summer.finalize()[..] {
243 return Err(Error::Http(
244 StatusCode::BAD_REQUEST,
245 "invalid checksum".into(),
246 ))?;
247 }
248
249 let request_body: telemetry_events::EventRequestBody =
250 serde_json::from_slice(&body).map_err(|err| {
251 log::error!("can't parse event json: {err}");
252 Error::Internal(anyhow!(err))
253 })?;
254
255 let mut to_upload = ToUpload::default();
256 let Some(last_event) = request_body.events.last() else {
257 return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
258 };
259 let country_code = country_code_header.map(|h| h.0 .0);
260
261 let first_event_at = chrono::Utc::now()
262 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
263
264 for wrapper in &request_body.events {
265 match &wrapper.event {
266 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
267 event.clone(),
268 &wrapper,
269 &request_body,
270 first_event_at,
271 country_code.clone(),
272 )),
273 Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
274 event.clone(),
275 &wrapper,
276 &request_body,
277 first_event_at,
278 country_code.clone(),
279 )),
280 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
281 event.clone(),
282 &wrapper,
283 &request_body,
284 first_event_at,
285 )),
286 Event::Assistant(event) => {
287 to_upload
288 .assistant_events
289 .push(AssistantEventRow::from_event(
290 event.clone(),
291 &wrapper,
292 &request_body,
293 first_event_at,
294 ))
295 }
296 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
297 event.clone(),
298 &wrapper,
299 &request_body,
300 first_event_at,
301 )),
302 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
303 event.clone(),
304 &wrapper,
305 &request_body,
306 first_event_at,
307 )),
308 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
309 event.clone(),
310 &wrapper,
311 &request_body,
312 first_event_at,
313 )),
314 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
315 event.clone(),
316 &wrapper,
317 &request_body,
318 first_event_at,
319 )),
320 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
321 event.clone(),
322 &wrapper,
323 &request_body,
324 first_event_at,
325 )),
326 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
327 event.clone(),
328 &wrapper,
329 &request_body,
330 first_event_at,
331 )),
332 Event::Extension(event) => {
333 let metadata = app
334 .db
335 .get_extension_version(&event.extension_id, &event.version)
336 .await?;
337 to_upload
338 .extension_events
339 .push(ExtensionEventRow::from_event(
340 event.clone(),
341 &wrapper,
342 &request_body,
343 metadata,
344 first_event_at,
345 ))
346 }
347 }
348 }
349
350 to_upload
351 .upload(&clickhouse_client)
352 .await
353 .map_err(|err| Error::Internal(anyhow!(err)))?;
354
355 Ok(())
356}
357
358#[derive(Default)]
359struct ToUpload {
360 editor_events: Vec<EditorEventRow>,
361 copilot_events: Vec<CopilotEventRow>,
362 assistant_events: Vec<AssistantEventRow>,
363 call_events: Vec<CallEventRow>,
364 cpu_events: Vec<CpuEventRow>,
365 memory_events: Vec<MemoryEventRow>,
366 app_events: Vec<AppEventRow>,
367 setting_events: Vec<SettingEventRow>,
368 extension_events: Vec<ExtensionEventRow>,
369 edit_events: Vec<EditEventRow>,
370 action_events: Vec<ActionEventRow>,
371}
372
373impl ToUpload {
374 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
375 const EDITOR_EVENTS_TABLE: &str = "editor_events";
376 Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
377 .await
378 .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
379
380 const COPILOT_EVENTS_TABLE: &str = "copilot_events";
381 Self::upload_to_table(
382 COPILOT_EVENTS_TABLE,
383 &self.copilot_events,
384 clickhouse_client,
385 )
386 .await
387 .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
388
389 const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
390 Self::upload_to_table(
391 ASSISTANT_EVENTS_TABLE,
392 &self.assistant_events,
393 clickhouse_client,
394 )
395 .await
396 .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
397
398 const CALL_EVENTS_TABLE: &str = "call_events";
399 Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
400 .await
401 .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
402
403 const CPU_EVENTS_TABLE: &str = "cpu_events";
404 Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
405 .await
406 .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
407
408 const MEMORY_EVENTS_TABLE: &str = "memory_events";
409 Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
410 .await
411 .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
412
413 const APP_EVENTS_TABLE: &str = "app_events";
414 Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
415 .await
416 .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
417
418 const SETTING_EVENTS_TABLE: &str = "setting_events";
419 Self::upload_to_table(
420 SETTING_EVENTS_TABLE,
421 &self.setting_events,
422 clickhouse_client,
423 )
424 .await
425 .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
426
427 const EXTENSION_EVENTS_TABLE: &str = "extension_events";
428 Self::upload_to_table(
429 EXTENSION_EVENTS_TABLE,
430 &self.extension_events,
431 clickhouse_client,
432 )
433 .await
434 .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
435
436 const EDIT_EVENTS_TABLE: &str = "edit_events";
437 Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
438 .await
439 .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
440
441 const ACTION_EVENTS_TABLE: &str = "action_events";
442 Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
443 .await
444 .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
445
446 Ok(())
447 }
448
449 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
450 table: &str,
451 rows: &[T],
452 clickhouse_client: &clickhouse::Client,
453 ) -> anyhow::Result<()> {
454 if !rows.is_empty() {
455 let mut insert = clickhouse_client.insert(table)?;
456
457 for event in rows {
458 insert.write(event).await?;
459 }
460
461 insert.end().await?;
462
463 let event_count = rows.len();
464 log::info!(
465 "wrote {event_count} {event_specifier} to '{table}'",
466 event_specifier = if event_count == 1 { "event" } else { "events" }
467 );
468 }
469
470 Ok(())
471 }
472}
473
474pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
475where
476 S: Serializer,
477{
478 if country_code.len() != 2 {
479 use serde::ser::Error;
480 return Err(S::Error::custom(
481 "country_code must be exactly 2 characters",
482 ));
483 }
484
485 let country_code = country_code.as_bytes();
486
487 serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
488}
489
490#[derive(Serialize, Debug, clickhouse::Row)]
491pub struct EditorEventRow {
492 pub installation_id: String,
493 pub operation: String,
494 pub app_version: String,
495 pub file_extension: String,
496 pub os_name: String,
497 pub os_version: String,
498 pub release_channel: String,
499 pub signed_in: bool,
500 pub vim_mode: bool,
501 #[serde(serialize_with = "serialize_country_code")]
502 pub country_code: String,
503 pub region_code: String,
504 pub city: String,
505 pub time: i64,
506 pub copilot_enabled: bool,
507 pub copilot_enabled_for_language: bool,
508 pub historical_event: bool,
509 pub architecture: String,
510 pub is_staff: Option<bool>,
511 pub session_id: Option<String>,
512 pub major: Option<i32>,
513 pub minor: Option<i32>,
514 pub patch: Option<i32>,
515}
516
517impl EditorEventRow {
518 fn from_event(
519 event: EditorEvent,
520 wrapper: &EventWrapper,
521 body: &EventRequestBody,
522 first_event_at: chrono::DateTime<chrono::Utc>,
523 country_code: Option<String>,
524 ) -> Self {
525 let semver = body.semver();
526 let time =
527 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
528
529 Self {
530 app_version: body.app_version.clone(),
531 major: semver.map(|v| v.major() as i32),
532 minor: semver.map(|v| v.minor() as i32),
533 patch: semver.map(|v| v.patch() as i32),
534 release_channel: body.release_channel.clone().unwrap_or_default(),
535 os_name: body.os_name.clone(),
536 os_version: body.os_version.clone().unwrap_or_default(),
537 architecture: body.architecture.clone(),
538 installation_id: body.installation_id.clone().unwrap_or_default(),
539 session_id: body.session_id.clone(),
540 is_staff: body.is_staff,
541 time: time.timestamp_millis(),
542 operation: event.operation,
543 file_extension: event.file_extension.unwrap_or_default(),
544 signed_in: wrapper.signed_in,
545 vim_mode: event.vim_mode,
546 copilot_enabled: event.copilot_enabled,
547 copilot_enabled_for_language: event.copilot_enabled_for_language,
548 country_code: country_code.unwrap_or("XX".to_string()),
549 region_code: "".to_string(),
550 city: "".to_string(),
551 historical_event: false,
552 }
553 }
554}
555
556#[derive(Serialize, Debug, clickhouse::Row)]
557pub struct CopilotEventRow {
558 pub installation_id: String,
559 pub suggestion_id: String,
560 pub suggestion_accepted: bool,
561 pub app_version: String,
562 pub file_extension: String,
563 pub os_name: String,
564 pub os_version: String,
565 pub release_channel: String,
566 pub signed_in: bool,
567 #[serde(serialize_with = "serialize_country_code")]
568 pub country_code: String,
569 pub region_code: String,
570 pub city: String,
571 pub time: i64,
572 pub is_staff: Option<bool>,
573 pub session_id: Option<String>,
574 pub major: Option<i32>,
575 pub minor: Option<i32>,
576 pub patch: Option<i32>,
577}
578
579impl CopilotEventRow {
580 fn from_event(
581 event: CopilotEvent,
582 wrapper: &EventWrapper,
583 body: &EventRequestBody,
584 first_event_at: chrono::DateTime<chrono::Utc>,
585 country_code: Option<String>,
586 ) -> Self {
587 let semver = body.semver();
588 let time =
589 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
590
591 Self {
592 app_version: body.app_version.clone(),
593 major: semver.map(|v| v.major() as i32),
594 minor: semver.map(|v| v.minor() as i32),
595 patch: semver.map(|v| v.patch() as i32),
596 release_channel: body.release_channel.clone().unwrap_or_default(),
597 os_name: body.os_name.clone(),
598 os_version: body.os_version.clone().unwrap_or_default(),
599 installation_id: body.installation_id.clone().unwrap_or_default(),
600 session_id: body.session_id.clone(),
601 is_staff: body.is_staff,
602 time: time.timestamp_millis(),
603 file_extension: event.file_extension.unwrap_or_default(),
604 signed_in: wrapper.signed_in,
605 country_code: country_code.unwrap_or("XX".to_string()),
606 region_code: "".to_string(),
607 city: "".to_string(),
608 suggestion_id: event.suggestion_id.unwrap_or_default(),
609 suggestion_accepted: event.suggestion_accepted,
610 }
611 }
612}
613
614#[derive(Serialize, Debug, clickhouse::Row)]
615pub struct CallEventRow {
616 // AppInfoBase
617 app_version: String,
618 major: Option<i32>,
619 minor: Option<i32>,
620 patch: Option<i32>,
621 release_channel: String,
622
623 // ClientEventBase
624 installation_id: String,
625 session_id: Option<String>,
626 is_staff: Option<bool>,
627 time: i64,
628
629 // CallEventRow
630 operation: String,
631 room_id: Option<u64>,
632 channel_id: Option<u64>,
633}
634
635impl CallEventRow {
636 fn from_event(
637 event: CallEvent,
638 wrapper: &EventWrapper,
639 body: &EventRequestBody,
640 first_event_at: chrono::DateTime<chrono::Utc>,
641 ) -> Self {
642 let semver = body.semver();
643 let time =
644 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
645
646 Self {
647 app_version: body.app_version.clone(),
648 major: semver.map(|v| v.major() as i32),
649 minor: semver.map(|v| v.minor() as i32),
650 patch: semver.map(|v| v.patch() as i32),
651 release_channel: body.release_channel.clone().unwrap_or_default(),
652 installation_id: body.installation_id.clone().unwrap_or_default(),
653 session_id: body.session_id.clone(),
654 is_staff: body.is_staff,
655 time: time.timestamp_millis(),
656 operation: event.operation,
657 room_id: event.room_id,
658 channel_id: event.channel_id,
659 }
660 }
661}
662
663#[derive(Serialize, Debug, clickhouse::Row)]
664pub struct AssistantEventRow {
665 // AppInfoBase
666 app_version: String,
667 major: Option<i32>,
668 minor: Option<i32>,
669 patch: Option<i32>,
670 release_channel: String,
671
672 // ClientEventBase
673 installation_id: Option<String>,
674 session_id: Option<String>,
675 is_staff: Option<bool>,
676 time: i64,
677
678 // AssistantEventRow
679 conversation_id: String,
680 kind: String,
681 model: String,
682}
683
684impl AssistantEventRow {
685 fn from_event(
686 event: AssistantEvent,
687 wrapper: &EventWrapper,
688 body: &EventRequestBody,
689 first_event_at: chrono::DateTime<chrono::Utc>,
690 ) -> Self {
691 let semver = body.semver();
692 let time =
693 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
694
695 Self {
696 app_version: body.app_version.clone(),
697 major: semver.map(|v| v.major() as i32),
698 minor: semver.map(|v| v.minor() as i32),
699 patch: semver.map(|v| v.patch() as i32),
700 release_channel: body.release_channel.clone().unwrap_or_default(),
701 installation_id: body.installation_id.clone(),
702 session_id: body.session_id.clone(),
703 is_staff: body.is_staff,
704 time: time.timestamp_millis(),
705 conversation_id: event.conversation_id.unwrap_or_default(),
706 kind: event.kind.to_string(),
707 model: event.model,
708 }
709 }
710}
711
712#[derive(Debug, clickhouse::Row, Serialize)]
713pub struct CpuEventRow {
714 pub installation_id: Option<String>,
715 pub is_staff: Option<bool>,
716 pub usage_as_percentage: f32,
717 pub core_count: u32,
718 pub app_version: String,
719 pub release_channel: String,
720 pub time: i64,
721 pub session_id: Option<String>,
722 // pub normalized_cpu_usage: f64, MATERIALIZED
723 pub major: Option<i32>,
724 pub minor: Option<i32>,
725 pub patch: Option<i32>,
726}
727
728impl CpuEventRow {
729 fn from_event(
730 event: CpuEvent,
731 wrapper: &EventWrapper,
732 body: &EventRequestBody,
733 first_event_at: chrono::DateTime<chrono::Utc>,
734 ) -> Self {
735 let semver = body.semver();
736 let time =
737 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
738
739 Self {
740 app_version: body.app_version.clone(),
741 major: semver.map(|v| v.major() as i32),
742 minor: semver.map(|v| v.minor() as i32),
743 patch: semver.map(|v| v.patch() as i32),
744 release_channel: body.release_channel.clone().unwrap_or_default(),
745 installation_id: body.installation_id.clone(),
746 session_id: body.session_id.clone(),
747 is_staff: body.is_staff,
748 time: time.timestamp_millis(),
749 usage_as_percentage: event.usage_as_percentage,
750 core_count: event.core_count,
751 }
752 }
753}
754
755#[derive(Serialize, Debug, clickhouse::Row)]
756pub struct MemoryEventRow {
757 // AppInfoBase
758 app_version: String,
759 major: Option<i32>,
760 minor: Option<i32>,
761 patch: Option<i32>,
762 release_channel: String,
763
764 // ClientEventBase
765 installation_id: Option<String>,
766 session_id: Option<String>,
767 is_staff: Option<bool>,
768 time: i64,
769
770 // MemoryEventRow
771 memory_in_bytes: u64,
772 virtual_memory_in_bytes: u64,
773}
774
775impl MemoryEventRow {
776 fn from_event(
777 event: MemoryEvent,
778 wrapper: &EventWrapper,
779 body: &EventRequestBody,
780 first_event_at: chrono::DateTime<chrono::Utc>,
781 ) -> Self {
782 let semver = body.semver();
783 let time =
784 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
785
786 Self {
787 app_version: body.app_version.clone(),
788 major: semver.map(|v| v.major() as i32),
789 minor: semver.map(|v| v.minor() as i32),
790 patch: semver.map(|v| v.patch() as i32),
791 release_channel: body.release_channel.clone().unwrap_or_default(),
792 installation_id: body.installation_id.clone(),
793 session_id: body.session_id.clone(),
794 is_staff: body.is_staff,
795 time: time.timestamp_millis(),
796 memory_in_bytes: event.memory_in_bytes,
797 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
798 }
799 }
800}
801
802#[derive(Serialize, Debug, clickhouse::Row)]
803pub struct AppEventRow {
804 // AppInfoBase
805 app_version: String,
806 major: Option<i32>,
807 minor: Option<i32>,
808 patch: Option<i32>,
809 release_channel: String,
810
811 // ClientEventBase
812 installation_id: Option<String>,
813 session_id: Option<String>,
814 is_staff: Option<bool>,
815 time: i64,
816
817 // AppEventRow
818 operation: String,
819}
820
821impl AppEventRow {
822 fn from_event(
823 event: AppEvent,
824 wrapper: &EventWrapper,
825 body: &EventRequestBody,
826 first_event_at: chrono::DateTime<chrono::Utc>,
827 ) -> Self {
828 let semver = body.semver();
829 let time =
830 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
831
832 Self {
833 app_version: body.app_version.clone(),
834 major: semver.map(|v| v.major() as i32),
835 minor: semver.map(|v| v.minor() as i32),
836 patch: semver.map(|v| v.patch() as i32),
837 release_channel: body.release_channel.clone().unwrap_or_default(),
838 installation_id: body.installation_id.clone(),
839 session_id: body.session_id.clone(),
840 is_staff: body.is_staff,
841 time: time.timestamp_millis(),
842 operation: event.operation,
843 }
844 }
845}
846
847#[derive(Serialize, Debug, clickhouse::Row)]
848pub struct SettingEventRow {
849 // AppInfoBase
850 app_version: String,
851 major: Option<i32>,
852 minor: Option<i32>,
853 patch: Option<i32>,
854 release_channel: String,
855
856 // ClientEventBase
857 installation_id: Option<String>,
858 session_id: Option<String>,
859 is_staff: Option<bool>,
860 time: i64,
861 // SettingEventRow
862 setting: String,
863 value: String,
864}
865
866impl SettingEventRow {
867 fn from_event(
868 event: SettingEvent,
869 wrapper: &EventWrapper,
870 body: &EventRequestBody,
871 first_event_at: chrono::DateTime<chrono::Utc>,
872 ) -> Self {
873 let semver = body.semver();
874 let time =
875 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
876
877 Self {
878 app_version: body.app_version.clone(),
879 major: semver.map(|v| v.major() as i32),
880 minor: semver.map(|v| v.minor() as i32),
881 patch: semver.map(|v| v.patch() as i32),
882 release_channel: body.release_channel.clone().unwrap_or_default(),
883 installation_id: body.installation_id.clone(),
884 session_id: body.session_id.clone(),
885 is_staff: body.is_staff,
886 time: time.timestamp_millis(),
887 setting: event.setting,
888 value: event.value,
889 }
890 }
891}
892
893#[derive(Serialize, Debug, clickhouse::Row)]
894pub struct ExtensionEventRow {
895 // AppInfoBase
896 app_version: String,
897 major: Option<i32>,
898 minor: Option<i32>,
899 patch: Option<i32>,
900 release_channel: String,
901
902 // ClientEventBase
903 installation_id: Option<String>,
904 session_id: Option<String>,
905 is_staff: Option<bool>,
906 time: i64,
907
908 // ExtensionEventRow
909 extension_id: Arc<str>,
910 extension_version: Arc<str>,
911 dev: bool,
912 schema_version: Option<i32>,
913 wasm_api_version: Option<String>,
914}
915
916impl ExtensionEventRow {
917 fn from_event(
918 event: ExtensionEvent,
919 wrapper: &EventWrapper,
920 body: &EventRequestBody,
921 extension_metadata: Option<ExtensionMetadata>,
922 first_event_at: chrono::DateTime<chrono::Utc>,
923 ) -> Self {
924 let semver = body.semver();
925 let time =
926 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
927
928 Self {
929 app_version: body.app_version.clone(),
930 major: semver.map(|v| v.major() as i32),
931 minor: semver.map(|v| v.minor() as i32),
932 patch: semver.map(|v| v.patch() as i32),
933 release_channel: body.release_channel.clone().unwrap_or_default(),
934 installation_id: body.installation_id.clone(),
935 session_id: body.session_id.clone(),
936 is_staff: body.is_staff,
937 time: time.timestamp_millis(),
938 extension_id: event.extension_id,
939 extension_version: event.version,
940 dev: extension_metadata.is_none(),
941 schema_version: extension_metadata
942 .as_ref()
943 .and_then(|metadata| metadata.manifest.schema_version),
944 wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
945 metadata
946 .manifest
947 .wasm_api_version
948 .as_ref()
949 .map(|version| version.to_string())
950 }),
951 }
952 }
953}
954
955#[derive(Serialize, Debug, clickhouse::Row)]
956pub struct EditEventRow {
957 // AppInfoBase
958 app_version: String,
959 major: Option<i32>,
960 minor: Option<i32>,
961 patch: Option<i32>,
962 release_channel: String,
963
964 // ClientEventBase
965 installation_id: Option<String>,
966 // Note: This column name has a typo in the ClickHouse table.
967 #[serde(rename = "sesssion_id")]
968 session_id: Option<String>,
969 is_staff: Option<bool>,
970 time: i64,
971
972 // EditEventRow
973 period_start: i64,
974 period_end: i64,
975 environment: String,
976}
977
978impl EditEventRow {
979 fn from_event(
980 event: EditEvent,
981 wrapper: &EventWrapper,
982 body: &EventRequestBody,
983 first_event_at: chrono::DateTime<chrono::Utc>,
984 ) -> Self {
985 let semver = body.semver();
986 let time =
987 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
988
989 let period_start = time - chrono::Duration::milliseconds(event.duration);
990 let period_end = time;
991
992 Self {
993 app_version: body.app_version.clone(),
994 major: semver.map(|v| v.major() as i32),
995 minor: semver.map(|v| v.minor() as i32),
996 patch: semver.map(|v| v.patch() as i32),
997 release_channel: body.release_channel.clone().unwrap_or_default(),
998 installation_id: body.installation_id.clone(),
999 session_id: body.session_id.clone(),
1000 is_staff: body.is_staff,
1001 time: time.timestamp_millis(),
1002 period_start: period_start.timestamp_millis(),
1003 period_end: period_end.timestamp_millis(),
1004 environment: event.environment,
1005 }
1006 }
1007}
1008
1009#[derive(Serialize, Debug, clickhouse::Row)]
1010pub struct ActionEventRow {
1011 // AppInfoBase
1012 app_version: String,
1013 major: Option<i32>,
1014 minor: Option<i32>,
1015 patch: Option<i32>,
1016 release_channel: String,
1017
1018 // ClientEventBase
1019 installation_id: Option<String>,
1020 // Note: This column name has a typo in the ClickHouse table.
1021 #[serde(rename = "sesssion_id")]
1022 session_id: Option<String>,
1023 is_staff: Option<bool>,
1024 time: i64,
1025 // ActionEventRow
1026 source: String,
1027 action: String,
1028}
1029
1030impl ActionEventRow {
1031 fn from_event(
1032 event: ActionEvent,
1033 wrapper: &EventWrapper,
1034 body: &EventRequestBody,
1035 first_event_at: chrono::DateTime<chrono::Utc>,
1036 ) -> Self {
1037 let semver = body.semver();
1038 let time =
1039 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1040
1041 Self {
1042 app_version: body.app_version.clone(),
1043 major: semver.map(|v| v.major() as i32),
1044 minor: semver.map(|v| v.minor() as i32),
1045 patch: semver.map(|v| v.patch() as i32),
1046 release_channel: body.release_channel.clone().unwrap_or_default(),
1047 installation_id: body.installation_id.clone(),
1048 session_id: body.session_id.clone(),
1049 is_staff: body.is_staff,
1050 time: time.timestamp_millis(),
1051 source: event.source,
1052 action: event.action,
1053 }
1054 }
1055}