1use super::ips_file::IpsFile;
2use crate::{api::slack, AppState, Error, Result};
3use anyhow::{anyhow, Context};
4use aws_sdk_s3::primitives::ByteStream;
5use axum::{
6 body::Bytes,
7 headers::Header,
8 http::{HeaderMap, HeaderName, StatusCode},
9 routing::post,
10 Extension, Router, TypedHeader,
11};
12use rpc::ExtensionMetadata;
13use semantic_version::SemanticVersion;
14use serde::{Serialize, Serializer};
15use sha2::{Digest, Sha256};
16use std::sync::{Arc, OnceLock};
17use telemetry_events::{
18 ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
19 EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent, SettingEvent,
20};
21
22pub fn router() -> Router {
23 Router::new()
24 .route("/telemetry/events", post(post_events))
25 .route("/telemetry/crashes", post(post_crash))
26}
27
28pub struct ZedChecksumHeader(Vec<u8>);
29
30impl Header for ZedChecksumHeader {
31 fn name() -> &'static HeaderName {
32 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
33 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
34 }
35
36 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
37 where
38 Self: Sized,
39 I: Iterator<Item = &'i axum::http::HeaderValue>,
40 {
41 let checksum = values
42 .next()
43 .ok_or_else(axum::headers::Error::invalid)?
44 .to_str()
45 .map_err(|_| axum::headers::Error::invalid())?;
46
47 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
48 Ok(Self(bytes))
49 }
50
51 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
52 unimplemented!()
53 }
54}
55
56pub struct CloudflareIpCountryHeader(String);
57
58impl Header for CloudflareIpCountryHeader {
59 fn name() -> &'static HeaderName {
60 static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
61 CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
62 }
63
64 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
65 where
66 Self: Sized,
67 I: Iterator<Item = &'i axum::http::HeaderValue>,
68 {
69 let country_code = values
70 .next()
71 .ok_or_else(axum::headers::Error::invalid)?
72 .to_str()
73 .map_err(|_| axum::headers::Error::invalid())?;
74
75 Ok(Self(country_code.to_string()))
76 }
77
78 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
79 unimplemented!()
80 }
81}
82
83pub async fn post_crash(
84 Extension(app): Extension<Arc<AppState>>,
85 headers: HeaderMap,
86 body: Bytes,
87) -> Result<()> {
88 static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
89
90 let report = IpsFile::parse(&body)?;
91 let version_threshold = SemanticVersion::new(0, 123, 0);
92
93 let bundle_id = &report.header.bundle_id;
94 let app_version = &report.app_version();
95
96 if bundle_id == "dev.zed.Zed-Dev" {
97 log::error!("Crash uploads from {} are ignored.", bundle_id);
98 return Ok(());
99 }
100
101 if app_version.is_none() || app_version.unwrap() < version_threshold {
102 log::error!(
103 "Crash uploads from {} are ignored.",
104 report.header.app_version
105 );
106 return Ok(());
107 }
108 let app_version = app_version.unwrap();
109
110 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
111 let response = blob_store_client
112 .head_object()
113 .bucket(CRASH_REPORTS_BUCKET)
114 .key(report.header.incident_id.clone() + ".ips")
115 .send()
116 .await;
117
118 if response.is_ok() {
119 log::info!("We've already uploaded this crash");
120 return Ok(());
121 }
122
123 blob_store_client
124 .put_object()
125 .bucket(CRASH_REPORTS_BUCKET)
126 .key(report.header.incident_id.clone() + ".ips")
127 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
128 .body(ByteStream::from(body.to_vec()))
129 .send()
130 .await
131 .map_err(|e| log::error!("Failed to upload crash: {}", e))
132 .ok();
133 }
134
135 let recent_panic_on: Option<i64> = headers
136 .get("x-zed-panicked-on")
137 .and_then(|h| h.to_str().ok())
138 .and_then(|s| s.parse().ok());
139
140 let installation_id = headers
141 .get("x-zed-installation-id")
142 .and_then(|h| h.to_str().ok())
143 .map(|s| s.to_string())
144 .unwrap_or_default();
145
146 let mut recent_panic = None;
147
148 if let Some(recent_panic_on) = recent_panic_on {
149 let crashed_at = match report.timestamp() {
150 Ok(t) => Some(t),
151 Err(e) => {
152 log::error!("Can't parse {}: {}", report.header.timestamp, e);
153 None
154 }
155 };
156 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
157 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
158 }
159 }
160
161 let description = report.description(recent_panic);
162 let summary = report.backtrace_summary();
163
164 tracing::error!(
165 service = "client",
166 version = %report.header.app_version,
167 os_version = %report.header.os_version,
168 bundle_id = %report.header.bundle_id,
169 incident_id = %report.header.incident_id,
170 installation_id = %installation_id,
171 description = %description,
172 backtrace = %summary,
173 "crash report");
174
175 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
176 let payload = slack::WebhookBody::new(|w| {
177 w.add_section(|s| s.text(slack::Text::markdown(description)))
178 .add_section(|s| {
179 s.add_field(slack::Text::markdown(format!(
180 "*Version:*\n{} ({})",
181 bundle_id, app_version
182 )))
183 .add_field({
184 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
185 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
186 hostname.strip_prefix("http://").unwrap_or_default()
187 });
188
189 slack::Text::markdown(format!(
190 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
191 CRASH_REPORTS_BUCKET,
192 hostname,
193 report.header.incident_id,
194 report
195 .header
196 .incident_id
197 .chars()
198 .take(8)
199 .collect::<String>(),
200 ))
201 })
202 })
203 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
204 });
205 let payload_json = serde_json::to_string(&payload).map_err(|err| {
206 log::error!("Failed to serialize payload to JSON: {err}");
207 Error::Internal(anyhow!(err))
208 })?;
209
210 reqwest::Client::new()
211 .post(slack_panics_webhook)
212 .header("Content-Type", "application/json")
213 .body(payload_json)
214 .send()
215 .await
216 .map_err(|err| {
217 log::error!("Failed to send payload to Slack: {err}");
218 Error::Internal(anyhow!(err))
219 })?;
220 }
221
222 Ok(())
223}
224
225pub async fn post_events(
226 Extension(app): Extension<Arc<AppState>>,
227 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
228 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
229 body: Bytes,
230) -> Result<()> {
231 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
232 Err(Error::Http(
233 StatusCode::NOT_IMPLEMENTED,
234 "not supported".into(),
235 ))?
236 };
237
238 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
239 return Err(Error::Http(
240 StatusCode::INTERNAL_SERVER_ERROR,
241 "events not enabled".into(),
242 ))?;
243 };
244
245 let mut summer = Sha256::new();
246 summer.update(checksum_seed);
247 summer.update(&body);
248 summer.update(checksum_seed);
249
250 if &checksum != &summer.finalize()[..] {
251 return Err(Error::Http(
252 StatusCode::BAD_REQUEST,
253 "invalid checksum".into(),
254 ))?;
255 }
256
257 let request_body: telemetry_events::EventRequestBody =
258 serde_json::from_slice(&body).map_err(|err| {
259 log::error!("can't parse event json: {err}");
260 Error::Internal(anyhow!(err))
261 })?;
262
263 let mut to_upload = ToUpload::default();
264 let Some(last_event) = request_body.events.last() else {
265 return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
266 };
267 let country_code = country_code_header.map(|h| h.0 .0);
268
269 let first_event_at = chrono::Utc::now()
270 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
271
272 for wrapper in &request_body.events {
273 match &wrapper.event {
274 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
275 event.clone(),
276 &wrapper,
277 &request_body,
278 first_event_at,
279 country_code.clone(),
280 )),
281 Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
282 event.clone(),
283 &wrapper,
284 &request_body,
285 first_event_at,
286 country_code.clone(),
287 )),
288 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
289 event.clone(),
290 &wrapper,
291 &request_body,
292 first_event_at,
293 )),
294 Event::Assistant(event) => {
295 to_upload
296 .assistant_events
297 .push(AssistantEventRow::from_event(
298 event.clone(),
299 &wrapper,
300 &request_body,
301 first_event_at,
302 ))
303 }
304 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
305 event.clone(),
306 &wrapper,
307 &request_body,
308 first_event_at,
309 )),
310 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
311 event.clone(),
312 &wrapper,
313 &request_body,
314 first_event_at,
315 )),
316 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
317 event.clone(),
318 &wrapper,
319 &request_body,
320 first_event_at,
321 )),
322 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
323 event.clone(),
324 &wrapper,
325 &request_body,
326 first_event_at,
327 )),
328 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
329 event.clone(),
330 &wrapper,
331 &request_body,
332 first_event_at,
333 )),
334 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
335 event.clone(),
336 &wrapper,
337 &request_body,
338 first_event_at,
339 )),
340 Event::Extension(event) => {
341 let metadata = app
342 .db
343 .get_extension_version(&event.extension_id, &event.version)
344 .await?;
345 to_upload
346 .extension_events
347 .push(ExtensionEventRow::from_event(
348 event.clone(),
349 &wrapper,
350 &request_body,
351 metadata,
352 first_event_at,
353 ))
354 }
355 }
356 }
357
358 to_upload
359 .upload(&clickhouse_client)
360 .await
361 .map_err(|err| Error::Internal(anyhow!(err)))?;
362
363 Ok(())
364}
365
366#[derive(Default)]
367struct ToUpload {
368 editor_events: Vec<EditorEventRow>,
369 copilot_events: Vec<CopilotEventRow>,
370 assistant_events: Vec<AssistantEventRow>,
371 call_events: Vec<CallEventRow>,
372 cpu_events: Vec<CpuEventRow>,
373 memory_events: Vec<MemoryEventRow>,
374 app_events: Vec<AppEventRow>,
375 setting_events: Vec<SettingEventRow>,
376 extension_events: Vec<ExtensionEventRow>,
377 edit_events: Vec<EditEventRow>,
378 action_events: Vec<ActionEventRow>,
379}
380
381impl ToUpload {
382 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
383 const EDITOR_EVENTS_TABLE: &str = "editor_events";
384 Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
385 .await
386 .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
387
388 const COPILOT_EVENTS_TABLE: &str = "copilot_events";
389 Self::upload_to_table(
390 COPILOT_EVENTS_TABLE,
391 &self.copilot_events,
392 clickhouse_client,
393 )
394 .await
395 .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
396
397 const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
398 Self::upload_to_table(
399 ASSISTANT_EVENTS_TABLE,
400 &self.assistant_events,
401 clickhouse_client,
402 )
403 .await
404 .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
405
406 const CALL_EVENTS_TABLE: &str = "call_events";
407 Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
408 .await
409 .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
410
411 const CPU_EVENTS_TABLE: &str = "cpu_events";
412 Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
413 .await
414 .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
415
416 const MEMORY_EVENTS_TABLE: &str = "memory_events";
417 Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
418 .await
419 .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
420
421 const APP_EVENTS_TABLE: &str = "app_events";
422 Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
423 .await
424 .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
425
426 const SETTING_EVENTS_TABLE: &str = "setting_events";
427 Self::upload_to_table(
428 SETTING_EVENTS_TABLE,
429 &self.setting_events,
430 clickhouse_client,
431 )
432 .await
433 .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
434
435 const EXTENSION_EVENTS_TABLE: &str = "extension_events";
436 Self::upload_to_table(
437 EXTENSION_EVENTS_TABLE,
438 &self.extension_events,
439 clickhouse_client,
440 )
441 .await
442 .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
443
444 const EDIT_EVENTS_TABLE: &str = "edit_events";
445 Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
446 .await
447 .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
448
449 const ACTION_EVENTS_TABLE: &str = "action_events";
450 Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
451 .await
452 .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
453
454 Ok(())
455 }
456
457 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
458 table: &str,
459 rows: &[T],
460 clickhouse_client: &clickhouse::Client,
461 ) -> anyhow::Result<()> {
462 if !rows.is_empty() {
463 let mut insert = clickhouse_client.insert(table)?;
464
465 for event in rows {
466 insert.write(event).await?;
467 }
468
469 insert.end().await?;
470
471 let event_count = rows.len();
472 log::info!(
473 "wrote {event_count} {event_specifier} to '{table}'",
474 event_specifier = if event_count == 1 { "event" } else { "events" }
475 );
476 }
477
478 Ok(())
479 }
480}
481
482pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
483where
484 S: Serializer,
485{
486 if country_code.len() != 2 {
487 use serde::ser::Error;
488 return Err(S::Error::custom(
489 "country_code must be exactly 2 characters",
490 ));
491 }
492
493 let country_code = country_code.as_bytes();
494
495 serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
496}
497
498#[derive(Serialize, Debug, clickhouse::Row)]
499pub struct EditorEventRow {
500 pub installation_id: String,
501 pub operation: String,
502 pub app_version: String,
503 pub file_extension: String,
504 pub os_name: String,
505 pub os_version: String,
506 pub release_channel: String,
507 pub signed_in: bool,
508 pub vim_mode: bool,
509 #[serde(serialize_with = "serialize_country_code")]
510 pub country_code: String,
511 pub region_code: String,
512 pub city: String,
513 pub time: i64,
514 pub copilot_enabled: bool,
515 pub copilot_enabled_for_language: bool,
516 pub historical_event: bool,
517 pub architecture: String,
518 pub is_staff: Option<bool>,
519 pub session_id: Option<String>,
520 pub major: Option<i32>,
521 pub minor: Option<i32>,
522 pub patch: Option<i32>,
523}
524
525impl EditorEventRow {
526 fn from_event(
527 event: EditorEvent,
528 wrapper: &EventWrapper,
529 body: &EventRequestBody,
530 first_event_at: chrono::DateTime<chrono::Utc>,
531 country_code: Option<String>,
532 ) -> Self {
533 let semver = body.semver();
534 let time =
535 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
536
537 Self {
538 app_version: body.app_version.clone(),
539 major: semver.map(|v| v.major() as i32),
540 minor: semver.map(|v| v.minor() as i32),
541 patch: semver.map(|v| v.patch() as i32),
542 release_channel: body.release_channel.clone().unwrap_or_default(),
543 os_name: body.os_name.clone(),
544 os_version: body.os_version.clone().unwrap_or_default(),
545 architecture: body.architecture.clone(),
546 installation_id: body.installation_id.clone().unwrap_or_default(),
547 session_id: body.session_id.clone(),
548 is_staff: body.is_staff,
549 time: time.timestamp_millis(),
550 operation: event.operation,
551 file_extension: event.file_extension.unwrap_or_default(),
552 signed_in: wrapper.signed_in,
553 vim_mode: event.vim_mode,
554 copilot_enabled: event.copilot_enabled,
555 copilot_enabled_for_language: event.copilot_enabled_for_language,
556 country_code: country_code.unwrap_or("XX".to_string()),
557 region_code: "".to_string(),
558 city: "".to_string(),
559 historical_event: false,
560 }
561 }
562}
563
564#[derive(Serialize, Debug, clickhouse::Row)]
565pub struct CopilotEventRow {
566 pub installation_id: String,
567 pub suggestion_id: String,
568 pub suggestion_accepted: bool,
569 pub app_version: String,
570 pub file_extension: String,
571 pub os_name: String,
572 pub os_version: String,
573 pub release_channel: String,
574 pub signed_in: bool,
575 #[serde(serialize_with = "serialize_country_code")]
576 pub country_code: String,
577 pub region_code: String,
578 pub city: String,
579 pub time: i64,
580 pub is_staff: Option<bool>,
581 pub session_id: Option<String>,
582 pub major: Option<i32>,
583 pub minor: Option<i32>,
584 pub patch: Option<i32>,
585}
586
587impl CopilotEventRow {
588 fn from_event(
589 event: CopilotEvent,
590 wrapper: &EventWrapper,
591 body: &EventRequestBody,
592 first_event_at: chrono::DateTime<chrono::Utc>,
593 country_code: Option<String>,
594 ) -> Self {
595 let semver = body.semver();
596 let time =
597 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
598
599 Self {
600 app_version: body.app_version.clone(),
601 major: semver.map(|v| v.major() as i32),
602 minor: semver.map(|v| v.minor() as i32),
603 patch: semver.map(|v| v.patch() as i32),
604 release_channel: body.release_channel.clone().unwrap_or_default(),
605 os_name: body.os_name.clone(),
606 os_version: body.os_version.clone().unwrap_or_default(),
607 installation_id: body.installation_id.clone().unwrap_or_default(),
608 session_id: body.session_id.clone(),
609 is_staff: body.is_staff,
610 time: time.timestamp_millis(),
611 file_extension: event.file_extension.unwrap_or_default(),
612 signed_in: wrapper.signed_in,
613 country_code: country_code.unwrap_or("XX".to_string()),
614 region_code: "".to_string(),
615 city: "".to_string(),
616 suggestion_id: event.suggestion_id.unwrap_or_default(),
617 suggestion_accepted: event.suggestion_accepted,
618 }
619 }
620}
621
622#[derive(Serialize, Debug, clickhouse::Row)]
623pub struct CallEventRow {
624 // AppInfoBase
625 app_version: String,
626 major: Option<i32>,
627 minor: Option<i32>,
628 patch: Option<i32>,
629 release_channel: String,
630
631 // ClientEventBase
632 installation_id: String,
633 session_id: Option<String>,
634 is_staff: Option<bool>,
635 time: i64,
636
637 // CallEventRow
638 operation: String,
639 room_id: Option<u64>,
640 channel_id: Option<u64>,
641}
642
643impl CallEventRow {
644 fn from_event(
645 event: CallEvent,
646 wrapper: &EventWrapper,
647 body: &EventRequestBody,
648 first_event_at: chrono::DateTime<chrono::Utc>,
649 ) -> Self {
650 let semver = body.semver();
651 let time =
652 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
653
654 Self {
655 app_version: body.app_version.clone(),
656 major: semver.map(|v| v.major() as i32),
657 minor: semver.map(|v| v.minor() as i32),
658 patch: semver.map(|v| v.patch() as i32),
659 release_channel: body.release_channel.clone().unwrap_or_default(),
660 installation_id: body.installation_id.clone().unwrap_or_default(),
661 session_id: body.session_id.clone(),
662 is_staff: body.is_staff,
663 time: time.timestamp_millis(),
664 operation: event.operation,
665 room_id: event.room_id,
666 channel_id: event.channel_id,
667 }
668 }
669}
670
671#[derive(Serialize, Debug, clickhouse::Row)]
672pub struct AssistantEventRow {
673 // AppInfoBase
674 app_version: String,
675 major: Option<i32>,
676 minor: Option<i32>,
677 patch: Option<i32>,
678 release_channel: String,
679
680 // ClientEventBase
681 installation_id: Option<String>,
682 session_id: Option<String>,
683 is_staff: Option<bool>,
684 time: i64,
685
686 // AssistantEventRow
687 conversation_id: String,
688 kind: String,
689 model: String,
690}
691
692impl AssistantEventRow {
693 fn from_event(
694 event: AssistantEvent,
695 wrapper: &EventWrapper,
696 body: &EventRequestBody,
697 first_event_at: chrono::DateTime<chrono::Utc>,
698 ) -> Self {
699 let semver = body.semver();
700 let time =
701 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
702
703 Self {
704 app_version: body.app_version.clone(),
705 major: semver.map(|v| v.major() as i32),
706 minor: semver.map(|v| v.minor() as i32),
707 patch: semver.map(|v| v.patch() as i32),
708 release_channel: body.release_channel.clone().unwrap_or_default(),
709 installation_id: body.installation_id.clone(),
710 session_id: body.session_id.clone(),
711 is_staff: body.is_staff,
712 time: time.timestamp_millis(),
713 conversation_id: event.conversation_id.unwrap_or_default(),
714 kind: event.kind.to_string(),
715 model: event.model,
716 }
717 }
718}
719
720#[derive(Debug, clickhouse::Row, Serialize)]
721pub struct CpuEventRow {
722 pub installation_id: Option<String>,
723 pub is_staff: Option<bool>,
724 pub usage_as_percentage: f32,
725 pub core_count: u32,
726 pub app_version: String,
727 pub release_channel: String,
728 pub time: i64,
729 pub session_id: Option<String>,
730 // pub normalized_cpu_usage: f64, MATERIALIZED
731 pub major: Option<i32>,
732 pub minor: Option<i32>,
733 pub patch: Option<i32>,
734}
735
736impl CpuEventRow {
737 fn from_event(
738 event: CpuEvent,
739 wrapper: &EventWrapper,
740 body: &EventRequestBody,
741 first_event_at: chrono::DateTime<chrono::Utc>,
742 ) -> Self {
743 let semver = body.semver();
744 let time =
745 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
746
747 Self {
748 app_version: body.app_version.clone(),
749 major: semver.map(|v| v.major() as i32),
750 minor: semver.map(|v| v.minor() as i32),
751 patch: semver.map(|v| v.patch() as i32),
752 release_channel: body.release_channel.clone().unwrap_or_default(),
753 installation_id: body.installation_id.clone(),
754 session_id: body.session_id.clone(),
755 is_staff: body.is_staff,
756 time: time.timestamp_millis(),
757 usage_as_percentage: event.usage_as_percentage,
758 core_count: event.core_count,
759 }
760 }
761}
762
763#[derive(Serialize, Debug, clickhouse::Row)]
764pub struct MemoryEventRow {
765 // AppInfoBase
766 app_version: String,
767 major: Option<i32>,
768 minor: Option<i32>,
769 patch: Option<i32>,
770 release_channel: String,
771
772 // ClientEventBase
773 installation_id: Option<String>,
774 session_id: Option<String>,
775 is_staff: Option<bool>,
776 time: i64,
777
778 // MemoryEventRow
779 memory_in_bytes: u64,
780 virtual_memory_in_bytes: u64,
781}
782
783impl MemoryEventRow {
784 fn from_event(
785 event: MemoryEvent,
786 wrapper: &EventWrapper,
787 body: &EventRequestBody,
788 first_event_at: chrono::DateTime<chrono::Utc>,
789 ) -> Self {
790 let semver = body.semver();
791 let time =
792 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
793
794 Self {
795 app_version: body.app_version.clone(),
796 major: semver.map(|v| v.major() as i32),
797 minor: semver.map(|v| v.minor() as i32),
798 patch: semver.map(|v| v.patch() as i32),
799 release_channel: body.release_channel.clone().unwrap_or_default(),
800 installation_id: body.installation_id.clone(),
801 session_id: body.session_id.clone(),
802 is_staff: body.is_staff,
803 time: time.timestamp_millis(),
804 memory_in_bytes: event.memory_in_bytes,
805 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
806 }
807 }
808}
809
810#[derive(Serialize, Debug, clickhouse::Row)]
811pub struct AppEventRow {
812 // AppInfoBase
813 app_version: String,
814 major: Option<i32>,
815 minor: Option<i32>,
816 patch: Option<i32>,
817 release_channel: String,
818
819 // ClientEventBase
820 installation_id: Option<String>,
821 session_id: Option<String>,
822 is_staff: Option<bool>,
823 time: i64,
824
825 // AppEventRow
826 operation: String,
827}
828
829impl AppEventRow {
830 fn from_event(
831 event: AppEvent,
832 wrapper: &EventWrapper,
833 body: &EventRequestBody,
834 first_event_at: chrono::DateTime<chrono::Utc>,
835 ) -> Self {
836 let semver = body.semver();
837 let time =
838 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
839
840 Self {
841 app_version: body.app_version.clone(),
842 major: semver.map(|v| v.major() as i32),
843 minor: semver.map(|v| v.minor() as i32),
844 patch: semver.map(|v| v.patch() as i32),
845 release_channel: body.release_channel.clone().unwrap_or_default(),
846 installation_id: body.installation_id.clone(),
847 session_id: body.session_id.clone(),
848 is_staff: body.is_staff,
849 time: time.timestamp_millis(),
850 operation: event.operation,
851 }
852 }
853}
854
855#[derive(Serialize, Debug, clickhouse::Row)]
856pub struct SettingEventRow {
857 // AppInfoBase
858 app_version: String,
859 major: Option<i32>,
860 minor: Option<i32>,
861 patch: Option<i32>,
862 release_channel: String,
863
864 // ClientEventBase
865 installation_id: Option<String>,
866 session_id: Option<String>,
867 is_staff: Option<bool>,
868 time: i64,
869 // SettingEventRow
870 setting: String,
871 value: String,
872}
873
874impl SettingEventRow {
875 fn from_event(
876 event: SettingEvent,
877 wrapper: &EventWrapper,
878 body: &EventRequestBody,
879 first_event_at: chrono::DateTime<chrono::Utc>,
880 ) -> Self {
881 let semver = body.semver();
882 let time =
883 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
884
885 Self {
886 app_version: body.app_version.clone(),
887 major: semver.map(|v| v.major() as i32),
888 minor: semver.map(|v| v.minor() as i32),
889 patch: semver.map(|v| v.patch() as i32),
890 release_channel: body.release_channel.clone().unwrap_or_default(),
891 installation_id: body.installation_id.clone(),
892 session_id: body.session_id.clone(),
893 is_staff: body.is_staff,
894 time: time.timestamp_millis(),
895 setting: event.setting,
896 value: event.value,
897 }
898 }
899}
900
901#[derive(Serialize, Debug, clickhouse::Row)]
902pub struct ExtensionEventRow {
903 // AppInfoBase
904 app_version: String,
905 major: Option<i32>,
906 minor: Option<i32>,
907 patch: Option<i32>,
908 release_channel: String,
909
910 // ClientEventBase
911 installation_id: Option<String>,
912 session_id: Option<String>,
913 is_staff: Option<bool>,
914 time: i64,
915
916 // ExtensionEventRow
917 extension_id: Arc<str>,
918 extension_version: Arc<str>,
919 dev: bool,
920 schema_version: Option<i32>,
921 wasm_api_version: Option<String>,
922}
923
924impl ExtensionEventRow {
925 fn from_event(
926 event: ExtensionEvent,
927 wrapper: &EventWrapper,
928 body: &EventRequestBody,
929 extension_metadata: Option<ExtensionMetadata>,
930 first_event_at: chrono::DateTime<chrono::Utc>,
931 ) -> Self {
932 let semver = body.semver();
933 let time =
934 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
935
936 Self {
937 app_version: body.app_version.clone(),
938 major: semver.map(|v| v.major() as i32),
939 minor: semver.map(|v| v.minor() as i32),
940 patch: semver.map(|v| v.patch() as i32),
941 release_channel: body.release_channel.clone().unwrap_or_default(),
942 installation_id: body.installation_id.clone(),
943 session_id: body.session_id.clone(),
944 is_staff: body.is_staff,
945 time: time.timestamp_millis(),
946 extension_id: event.extension_id,
947 extension_version: event.version,
948 dev: extension_metadata.is_none(),
949 schema_version: extension_metadata
950 .as_ref()
951 .and_then(|metadata| metadata.manifest.schema_version),
952 wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
953 metadata
954 .manifest
955 .wasm_api_version
956 .as_ref()
957 .map(|version| version.to_string())
958 }),
959 }
960 }
961}
962
963#[derive(Serialize, Debug, clickhouse::Row)]
964pub struct EditEventRow {
965 // AppInfoBase
966 app_version: String,
967 major: Option<i32>,
968 minor: Option<i32>,
969 patch: Option<i32>,
970 release_channel: String,
971
972 // ClientEventBase
973 installation_id: Option<String>,
974 // Note: This column name has a typo in the ClickHouse table.
975 #[serde(rename = "sesssion_id")]
976 session_id: Option<String>,
977 is_staff: Option<bool>,
978 time: i64,
979
980 // EditEventRow
981 period_start: i64,
982 period_end: i64,
983 environment: String,
984}
985
986impl EditEventRow {
987 fn from_event(
988 event: EditEvent,
989 wrapper: &EventWrapper,
990 body: &EventRequestBody,
991 first_event_at: chrono::DateTime<chrono::Utc>,
992 ) -> Self {
993 let semver = body.semver();
994 let time =
995 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
996
997 let period_start = time - chrono::Duration::milliseconds(event.duration);
998 let period_end = time;
999
1000 Self {
1001 app_version: body.app_version.clone(),
1002 major: semver.map(|v| v.major() as i32),
1003 minor: semver.map(|v| v.minor() as i32),
1004 patch: semver.map(|v| v.patch() as i32),
1005 release_channel: body.release_channel.clone().unwrap_or_default(),
1006 installation_id: body.installation_id.clone(),
1007 session_id: body.session_id.clone(),
1008 is_staff: body.is_staff,
1009 time: time.timestamp_millis(),
1010 period_start: period_start.timestamp_millis(),
1011 period_end: period_end.timestamp_millis(),
1012 environment: event.environment,
1013 }
1014 }
1015}
1016
1017#[derive(Serialize, Debug, clickhouse::Row)]
1018pub struct ActionEventRow {
1019 // AppInfoBase
1020 app_version: String,
1021 major: Option<i32>,
1022 minor: Option<i32>,
1023 patch: Option<i32>,
1024 release_channel: String,
1025
1026 // ClientEventBase
1027 installation_id: Option<String>,
1028 // Note: This column name has a typo in the ClickHouse table.
1029 #[serde(rename = "sesssion_id")]
1030 session_id: Option<String>,
1031 is_staff: Option<bool>,
1032 time: i64,
1033 // ActionEventRow
1034 source: String,
1035 action: String,
1036}
1037
1038impl ActionEventRow {
1039 fn from_event(
1040 event: ActionEvent,
1041 wrapper: &EventWrapper,
1042 body: &EventRequestBody,
1043 first_event_at: chrono::DateTime<chrono::Utc>,
1044 ) -> Self {
1045 let semver = body.semver();
1046 let time =
1047 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1048
1049 Self {
1050 app_version: body.app_version.clone(),
1051 major: semver.map(|v| v.major() as i32),
1052 minor: semver.map(|v| v.minor() as i32),
1053 patch: semver.map(|v| v.patch() as i32),
1054 release_channel: body.release_channel.clone().unwrap_or_default(),
1055 installation_id: body.installation_id.clone(),
1056 session_id: body.session_id.clone(),
1057 is_staff: body.is_staff,
1058 time: time.timestamp_millis(),
1059 source: event.source,
1060 action: event.action,
1061 }
1062 }
1063}