1use super::ips_file::IpsFile;
2use crate::api::CloudflareIpCountryHeader;
3use crate::clickhouse::write_to_table;
4use crate::{api::slack, AppState, Error, Result};
5use anyhow::{anyhow, Context};
6use aws_sdk_s3::primitives::ByteStream;
7use axum::{
8 body::Bytes,
9 headers::Header,
10 http::{HeaderMap, HeaderName, StatusCode},
11 routing::post,
12 Extension, Router, TypedHeader,
13};
14use rpc::ExtensionMetadata;
15use semantic_version::SemanticVersion;
16use serde::{Serialize, Serializer};
17use sha2::{Digest, Sha256};
18use std::sync::{Arc, OnceLock};
19use telemetry_events::{
20 ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
21 EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, Panic,
22 ReplEvent, SettingEvent,
23};
24use uuid::Uuid;
25
26static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
27
28pub fn router() -> Router {
29 Router::new()
30 .route("/telemetry/events", post(post_events))
31 .route("/telemetry/crashes", post(post_crash))
32 .route("/telemetry/panics", post(post_panic))
33 .route("/telemetry/hangs", post(post_hang))
34}
35
36pub struct ZedChecksumHeader(Vec<u8>);
37
38impl Header for ZedChecksumHeader {
39 fn name() -> &'static HeaderName {
40 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
41 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
42 }
43
44 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
45 where
46 Self: Sized,
47 I: Iterator<Item = &'i axum::http::HeaderValue>,
48 {
49 let checksum = values
50 .next()
51 .ok_or_else(axum::headers::Error::invalid)?
52 .to_str()
53 .map_err(|_| axum::headers::Error::invalid())?;
54
55 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
56 Ok(Self(bytes))
57 }
58
59 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
60 unimplemented!()
61 }
62}
63
64pub async fn post_crash(
65 Extension(app): Extension<Arc<AppState>>,
66 headers: HeaderMap,
67 body: Bytes,
68) -> Result<()> {
69 let report = IpsFile::parse(&body)?;
70 let version_threshold = SemanticVersion::new(0, 123, 0);
71
72 let bundle_id = &report.header.bundle_id;
73 let app_version = &report.app_version();
74
75 if bundle_id == "dev.zed.Zed-Dev" {
76 log::error!("Crash uploads from {} are ignored.", bundle_id);
77 return Ok(());
78 }
79
80 if app_version.is_none() || app_version.unwrap() < version_threshold {
81 log::error!(
82 "Crash uploads from {} are ignored.",
83 report.header.app_version
84 );
85 return Ok(());
86 }
87 let app_version = app_version.unwrap();
88
89 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
90 let response = blob_store_client
91 .head_object()
92 .bucket(CRASH_REPORTS_BUCKET)
93 .key(report.header.incident_id.clone() + ".ips")
94 .send()
95 .await;
96
97 if response.is_ok() {
98 log::info!("We've already uploaded this crash");
99 return Ok(());
100 }
101
102 blob_store_client
103 .put_object()
104 .bucket(CRASH_REPORTS_BUCKET)
105 .key(report.header.incident_id.clone() + ".ips")
106 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
107 .body(ByteStream::from(body.to_vec()))
108 .send()
109 .await
110 .map_err(|e| log::error!("Failed to upload crash: {}", e))
111 .ok();
112 }
113
114 let recent_panic_on: Option<i64> = headers
115 .get("x-zed-panicked-on")
116 .and_then(|h| h.to_str().ok())
117 .and_then(|s| s.parse().ok());
118
119 let installation_id = headers
120 .get("x-zed-installation-id")
121 .and_then(|h| h.to_str().ok())
122 .map(|s| s.to_string())
123 .unwrap_or_default();
124
125 let mut recent_panic = None;
126
127 if let Some(recent_panic_on) = recent_panic_on {
128 let crashed_at = match report.timestamp() {
129 Ok(t) => Some(t),
130 Err(e) => {
131 log::error!("Can't parse {}: {}", report.header.timestamp, e);
132 None
133 }
134 };
135 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
136 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
137 }
138 }
139
140 let description = report.description(recent_panic);
141 let summary = report.backtrace_summary();
142
143 tracing::error!(
144 service = "client",
145 version = %report.header.app_version,
146 os_version = %report.header.os_version,
147 bundle_id = %report.header.bundle_id,
148 incident_id = %report.header.incident_id,
149 installation_id = %installation_id,
150 description = %description,
151 backtrace = %summary,
152 "crash report"
153 );
154
155 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
156 let payload = slack::WebhookBody::new(|w| {
157 w.add_section(|s| s.text(slack::Text::markdown(description)))
158 .add_section(|s| {
159 s.add_field(slack::Text::markdown(format!(
160 "*Version:*\n{} ({})",
161 bundle_id, app_version
162 )))
163 .add_field({
164 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
165 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
166 hostname.strip_prefix("http://").unwrap_or_default()
167 });
168
169 slack::Text::markdown(format!(
170 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
171 CRASH_REPORTS_BUCKET,
172 hostname,
173 report.header.incident_id,
174 report
175 .header
176 .incident_id
177 .chars()
178 .take(8)
179 .collect::<String>(),
180 ))
181 })
182 })
183 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
184 });
185 let payload_json = serde_json::to_string(&payload).map_err(|err| {
186 log::error!("Failed to serialize payload to JSON: {err}");
187 Error::Internal(anyhow!(err))
188 })?;
189
190 reqwest::Client::new()
191 .post(slack_panics_webhook)
192 .header("Content-Type", "application/json")
193 .body(payload_json)
194 .send()
195 .await
196 .map_err(|err| {
197 log::error!("Failed to send payload to Slack: {err}");
198 Error::Internal(anyhow!(err))
199 })?;
200 }
201
202 Ok(())
203}
204
205pub async fn post_hang(
206 Extension(app): Extension<Arc<AppState>>,
207 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
208 body: Bytes,
209) -> Result<()> {
210 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
211 return Err(Error::http(
212 StatusCode::INTERNAL_SERVER_ERROR,
213 "events not enabled".into(),
214 ))?;
215 };
216
217 if checksum != expected {
218 return Err(Error::http(
219 StatusCode::BAD_REQUEST,
220 "invalid checksum".into(),
221 ))?;
222 }
223
224 let incident_id = Uuid::new_v4().to_string();
225
226 // dump JSON into S3 so we can get frame offsets if we need to.
227 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
228 blob_store_client
229 .put_object()
230 .bucket(CRASH_REPORTS_BUCKET)
231 .key(incident_id.clone() + ".hang.json")
232 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
233 .body(ByteStream::from(body.to_vec()))
234 .send()
235 .await
236 .map_err(|e| log::error!("Failed to upload crash: {}", e))
237 .ok();
238 }
239
240 let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
241 log::error!("can't parse report json: {err}");
242 Error::Internal(anyhow!(err))
243 })?;
244
245 let mut backtrace = "Possible hang detected on main thread:".to_string();
246 let unknown = "<unknown>".to_string();
247 for frame in report.backtrace.iter() {
248 backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
249 }
250
251 tracing::error!(
252 service = "client",
253 version = %report.app_version.unwrap_or_default().to_string(),
254 os_name = %report.os_name,
255 os_version = report.os_version.unwrap_or_default().to_string(),
256 incident_id = %incident_id,
257 installation_id = %report.installation_id.unwrap_or_default(),
258 backtrace = %backtrace,
259 "hang report");
260
261 Ok(())
262}
263
264pub async fn post_panic(
265 Extension(app): Extension<Arc<AppState>>,
266 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
267 body: Bytes,
268) -> Result<()> {
269 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
270 return Err(Error::http(
271 StatusCode::INTERNAL_SERVER_ERROR,
272 "events not enabled".into(),
273 ))?;
274 };
275
276 if checksum != expected {
277 return Err(Error::http(
278 StatusCode::BAD_REQUEST,
279 "invalid checksum".into(),
280 ))?;
281 }
282
283 let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
284 .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
285 let panic = report.panic;
286
287 if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
288 return Err(Error::http(
289 StatusCode::BAD_REQUEST,
290 "invalid os version".into(),
291 ))?;
292 }
293
294 tracing::error!(
295 service = "client",
296 version = %panic.app_version,
297 os_name = %panic.os_name,
298 os_version = %panic.os_version.clone().unwrap_or_default(),
299 installation_id = %panic.installation_id.clone().unwrap_or_default(),
300 description = %panic.payload,
301 backtrace = %panic.backtrace.join("\n"),
302 "panic report"
303 );
304
305 let backtrace = if panic.backtrace.len() > 25 {
306 let total = panic.backtrace.len();
307 format!(
308 "{}\n and {} more",
309 panic
310 .backtrace
311 .iter()
312 .take(20)
313 .cloned()
314 .collect::<Vec<_>>()
315 .join("\n"),
316 total - 20
317 )
318 } else {
319 panic.backtrace.join("\n")
320 };
321
322 if !report_to_slack(&panic) {
323 return Ok(());
324 }
325
326 let backtrace_with_summary = panic.payload + "\n" + &backtrace;
327
328 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
329 let payload = slack::WebhookBody::new(|w| {
330 w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
331 .add_section(|s| {
332 s.add_field(slack::Text::markdown(format!(
333 "*Version:*\n {} ",
334 panic.app_version
335 )))
336 .add_field({
337 slack::Text::markdown(format!(
338 "*OS:*\n{} {}",
339 panic.os_name,
340 panic.os_version.unwrap_or_default()
341 ))
342 })
343 })
344 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
345 });
346 let payload_json = serde_json::to_string(&payload).map_err(|err| {
347 log::error!("Failed to serialize payload to JSON: {err}");
348 Error::Internal(anyhow!(err))
349 })?;
350
351 reqwest::Client::new()
352 .post(slack_panics_webhook)
353 .header("Content-Type", "application/json")
354 .body(payload_json)
355 .send()
356 .await
357 .map_err(|err| {
358 log::error!("Failed to send payload to Slack: {err}");
359 Error::Internal(anyhow!(err))
360 })?;
361 }
362
363 Ok(())
364}
365
366fn report_to_slack(panic: &Panic) -> bool {
367 if panic.os_name == "Linux" {
368 if panic.payload.contains("ERROR_SURFACE_LOST_KHR") {
369 return false;
370 }
371
372 if panic
373 .payload
374 .contains("GPU has crashed, and no debug information is available")
375 {
376 return false;
377 }
378 }
379
380 true
381}
382
383pub async fn post_events(
384 Extension(app): Extension<Arc<AppState>>,
385 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
386 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
387 body: Bytes,
388) -> Result<()> {
389 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
390 Err(Error::http(
391 StatusCode::NOT_IMPLEMENTED,
392 "not supported".into(),
393 ))?
394 };
395
396 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
397 return Err(Error::http(
398 StatusCode::INTERNAL_SERVER_ERROR,
399 "events not enabled".into(),
400 ))?;
401 };
402
403 let checksum_matched = checksum == expected;
404
405 let request_body: telemetry_events::EventRequestBody =
406 serde_json::from_slice(&body).map_err(|err| {
407 log::error!("can't parse event json: {err}");
408 Error::Internal(anyhow!(err))
409 })?;
410
411 let mut to_upload = ToUpload::default();
412 let Some(last_event) = request_body.events.last() else {
413 return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
414 };
415 let country_code = country_code_header.map(|h| h.to_string());
416
417 let first_event_at = chrono::Utc::now()
418 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
419
420 for wrapper in &request_body.events {
421 match &wrapper.event {
422 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
423 event.clone(),
424 wrapper,
425 &request_body,
426 first_event_at,
427 country_code.clone(),
428 checksum_matched,
429 )),
430 // Needed for clients sending old copilot_event types
431 Event::Copilot(_) => {}
432 Event::InlineCompletion(event) => {
433 to_upload
434 .inline_completion_events
435 .push(InlineCompletionEventRow::from_event(
436 event.clone(),
437 wrapper,
438 &request_body,
439 first_event_at,
440 country_code.clone(),
441 checksum_matched,
442 ))
443 }
444 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
445 event.clone(),
446 wrapper,
447 &request_body,
448 first_event_at,
449 checksum_matched,
450 )),
451 Event::Assistant(event) => {
452 to_upload
453 .assistant_events
454 .push(AssistantEventRow::from_event(
455 event.clone(),
456 wrapper,
457 &request_body,
458 first_event_at,
459 checksum_matched,
460 ))
461 }
462 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
463 event.clone(),
464 wrapper,
465 &request_body,
466 first_event_at,
467 checksum_matched,
468 )),
469 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
470 event.clone(),
471 wrapper,
472 &request_body,
473 first_event_at,
474 checksum_matched,
475 )),
476 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
477 event.clone(),
478 wrapper,
479 &request_body,
480 first_event_at,
481 checksum_matched,
482 )),
483 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
484 event.clone(),
485 wrapper,
486 &request_body,
487 first_event_at,
488 checksum_matched,
489 )),
490 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
491 event.clone(),
492 wrapper,
493 &request_body,
494 first_event_at,
495 checksum_matched,
496 )),
497 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
498 event.clone(),
499 wrapper,
500 &request_body,
501 first_event_at,
502 checksum_matched,
503 )),
504 Event::Extension(event) => {
505 let metadata = app
506 .db
507 .get_extension_version(&event.extension_id, &event.version)
508 .await?;
509 to_upload
510 .extension_events
511 .push(ExtensionEventRow::from_event(
512 event.clone(),
513 wrapper,
514 &request_body,
515 metadata,
516 first_event_at,
517 checksum_matched,
518 ))
519 }
520 Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
521 event.clone(),
522 wrapper,
523 &request_body,
524 first_event_at,
525 checksum_matched,
526 )),
527 }
528 }
529
530 to_upload
531 .upload(&clickhouse_client)
532 .await
533 .map_err(|err| Error::Internal(anyhow!(err)))?;
534
535 Ok(())
536}
537
538#[derive(Default)]
539struct ToUpload {
540 editor_events: Vec<EditorEventRow>,
541 inline_completion_events: Vec<InlineCompletionEventRow>,
542 assistant_events: Vec<AssistantEventRow>,
543 call_events: Vec<CallEventRow>,
544 cpu_events: Vec<CpuEventRow>,
545 memory_events: Vec<MemoryEventRow>,
546 app_events: Vec<AppEventRow>,
547 setting_events: Vec<SettingEventRow>,
548 extension_events: Vec<ExtensionEventRow>,
549 edit_events: Vec<EditEventRow>,
550 action_events: Vec<ActionEventRow>,
551 repl_events: Vec<ReplEventRow>,
552}
553
554impl ToUpload {
555 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
556 const EDITOR_EVENTS_TABLE: &str = "editor_events";
557 write_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
558 .await
559 .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
560
561 const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
562 write_to_table(
563 INLINE_COMPLETION_EVENTS_TABLE,
564 &self.inline_completion_events,
565 clickhouse_client,
566 )
567 .await
568 .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
569
570 const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
571 write_to_table(
572 ASSISTANT_EVENTS_TABLE,
573 &self.assistant_events,
574 clickhouse_client,
575 )
576 .await
577 .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
578
579 const CALL_EVENTS_TABLE: &str = "call_events";
580 write_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
581 .await
582 .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
583
584 const CPU_EVENTS_TABLE: &str = "cpu_events";
585 write_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
586 .await
587 .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
588
589 const MEMORY_EVENTS_TABLE: &str = "memory_events";
590 write_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
591 .await
592 .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
593
594 const APP_EVENTS_TABLE: &str = "app_events";
595 write_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
596 .await
597 .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
598
599 const SETTING_EVENTS_TABLE: &str = "setting_events";
600 write_to_table(
601 SETTING_EVENTS_TABLE,
602 &self.setting_events,
603 clickhouse_client,
604 )
605 .await
606 .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
607
608 const EXTENSION_EVENTS_TABLE: &str = "extension_events";
609 write_to_table(
610 EXTENSION_EVENTS_TABLE,
611 &self.extension_events,
612 clickhouse_client,
613 )
614 .await
615 .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
616
617 const EDIT_EVENTS_TABLE: &str = "edit_events";
618 write_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
619 .await
620 .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
621
622 const ACTION_EVENTS_TABLE: &str = "action_events";
623 write_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
624 .await
625 .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
626
627 const REPL_EVENTS_TABLE: &str = "repl_events";
628 write_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
629 .await
630 .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
631
632 Ok(())
633 }
634}
635
636pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
637where
638 S: Serializer,
639{
640 if country_code.len() != 2 {
641 use serde::ser::Error;
642 return Err(S::Error::custom(
643 "country_code must be exactly 2 characters",
644 ));
645 }
646
647 let country_code = country_code.as_bytes();
648
649 serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
650}
651
652#[derive(Serialize, Debug, clickhouse::Row)]
653pub struct EditorEventRow {
654 system_id: String,
655 installation_id: String,
656 session_id: Option<String>,
657 metrics_id: String,
658 operation: String,
659 app_version: String,
660 file_extension: String,
661 os_name: String,
662 os_version: String,
663 release_channel: String,
664 signed_in: bool,
665 vim_mode: bool,
666 #[serde(serialize_with = "serialize_country_code")]
667 country_code: String,
668 region_code: String,
669 city: String,
670 time: i64,
671 copilot_enabled: bool,
672 copilot_enabled_for_language: bool,
673 historical_event: bool,
674 architecture: String,
675 is_staff: Option<bool>,
676 major: Option<i32>,
677 minor: Option<i32>,
678 patch: Option<i32>,
679 checksum_matched: bool,
680}
681
682impl EditorEventRow {
683 fn from_event(
684 event: EditorEvent,
685 wrapper: &EventWrapper,
686 body: &EventRequestBody,
687 first_event_at: chrono::DateTime<chrono::Utc>,
688 country_code: Option<String>,
689 checksum_matched: bool,
690 ) -> Self {
691 let semver = body.semver();
692 let time =
693 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
694
695 Self {
696 app_version: body.app_version.clone(),
697 major: semver.map(|v| v.major() as i32),
698 minor: semver.map(|v| v.minor() as i32),
699 patch: semver.map(|v| v.patch() as i32),
700 checksum_matched,
701 release_channel: body.release_channel.clone().unwrap_or_default(),
702 os_name: body.os_name.clone(),
703 os_version: body.os_version.clone().unwrap_or_default(),
704 architecture: body.architecture.clone(),
705 system_id: body.system_id.clone().unwrap_or_default(),
706 installation_id: body.installation_id.clone().unwrap_or_default(),
707 session_id: body.session_id.clone(),
708 metrics_id: body.metrics_id.clone().unwrap_or_default(),
709 is_staff: body.is_staff,
710 time: time.timestamp_millis(),
711 operation: event.operation,
712 file_extension: event.file_extension.unwrap_or_default(),
713 signed_in: wrapper.signed_in,
714 vim_mode: event.vim_mode,
715 copilot_enabled: event.copilot_enabled,
716 copilot_enabled_for_language: event.copilot_enabled_for_language,
717 country_code: country_code.unwrap_or("XX".to_string()),
718 region_code: "".to_string(),
719 city: "".to_string(),
720 historical_event: false,
721 }
722 }
723}
724
725#[derive(Serialize, Debug, clickhouse::Row)]
726pub struct InlineCompletionEventRow {
727 installation_id: String,
728 session_id: Option<String>,
729 provider: String,
730 suggestion_accepted: bool,
731 app_version: String,
732 file_extension: String,
733 os_name: String,
734 os_version: String,
735 release_channel: String,
736 signed_in: bool,
737 #[serde(serialize_with = "serialize_country_code")]
738 country_code: String,
739 region_code: String,
740 city: String,
741 time: i64,
742 is_staff: Option<bool>,
743 major: Option<i32>,
744 minor: Option<i32>,
745 patch: Option<i32>,
746 checksum_matched: bool,
747}
748
749impl InlineCompletionEventRow {
750 fn from_event(
751 event: InlineCompletionEvent,
752 wrapper: &EventWrapper,
753 body: &EventRequestBody,
754 first_event_at: chrono::DateTime<chrono::Utc>,
755 country_code: Option<String>,
756 checksum_matched: bool,
757 ) -> Self {
758 let semver = body.semver();
759 let time =
760 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
761
762 Self {
763 app_version: body.app_version.clone(),
764 major: semver.map(|v| v.major() as i32),
765 minor: semver.map(|v| v.minor() as i32),
766 patch: semver.map(|v| v.patch() as i32),
767 checksum_matched,
768 release_channel: body.release_channel.clone().unwrap_or_default(),
769 os_name: body.os_name.clone(),
770 os_version: body.os_version.clone().unwrap_or_default(),
771 installation_id: body.installation_id.clone().unwrap_or_default(),
772 session_id: body.session_id.clone(),
773 is_staff: body.is_staff,
774 time: time.timestamp_millis(),
775 file_extension: event.file_extension.unwrap_or_default(),
776 signed_in: wrapper.signed_in,
777 country_code: country_code.unwrap_or("XX".to_string()),
778 region_code: "".to_string(),
779 city: "".to_string(),
780 provider: event.provider,
781 suggestion_accepted: event.suggestion_accepted,
782 }
783 }
784}
785
786#[derive(Serialize, Debug, clickhouse::Row)]
787pub struct CallEventRow {
788 // AppInfoBase
789 app_version: String,
790 major: Option<i32>,
791 minor: Option<i32>,
792 patch: Option<i32>,
793 release_channel: String,
794 os_name: String,
795 os_version: String,
796 checksum_matched: bool,
797
798 // ClientEventBase
799 installation_id: String,
800 session_id: Option<String>,
801 is_staff: Option<bool>,
802 time: i64,
803
804 // CallEventRow
805 operation: String,
806 room_id: Option<u64>,
807 channel_id: Option<u64>,
808}
809
810impl CallEventRow {
811 fn from_event(
812 event: CallEvent,
813 wrapper: &EventWrapper,
814 body: &EventRequestBody,
815 first_event_at: chrono::DateTime<chrono::Utc>,
816 checksum_matched: bool,
817 ) -> Self {
818 let semver = body.semver();
819 let time =
820 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
821
822 Self {
823 app_version: body.app_version.clone(),
824 major: semver.map(|v| v.major() as i32),
825 minor: semver.map(|v| v.minor() as i32),
826 patch: semver.map(|v| v.patch() as i32),
827 checksum_matched,
828 release_channel: body.release_channel.clone().unwrap_or_default(),
829 os_name: body.os_name.clone(),
830 os_version: body.os_version.clone().unwrap_or_default(),
831 installation_id: body.installation_id.clone().unwrap_or_default(),
832 session_id: body.session_id.clone(),
833 is_staff: body.is_staff,
834 time: time.timestamp_millis(),
835 operation: event.operation,
836 room_id: event.room_id,
837 channel_id: event.channel_id,
838 }
839 }
840}
841
842#[derive(Serialize, Debug, clickhouse::Row)]
843pub struct AssistantEventRow {
844 // AppInfoBase
845 app_version: String,
846 major: Option<i32>,
847 minor: Option<i32>,
848 patch: Option<i32>,
849 checksum_matched: bool,
850 release_channel: String,
851 os_name: String,
852 os_version: String,
853
854 // ClientEventBase
855 installation_id: Option<String>,
856 session_id: Option<String>,
857 is_staff: Option<bool>,
858 time: i64,
859
860 // AssistantEventRow
861 conversation_id: String,
862 kind: String,
863 phase: String,
864 model: String,
865 response_latency_in_ms: Option<i64>,
866 error_message: Option<String>,
867}
868
869impl AssistantEventRow {
870 fn from_event(
871 event: AssistantEvent,
872 wrapper: &EventWrapper,
873 body: &EventRequestBody,
874 first_event_at: chrono::DateTime<chrono::Utc>,
875 checksum_matched: bool,
876 ) -> Self {
877 let semver = body.semver();
878 let time =
879 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
880
881 Self {
882 app_version: body.app_version.clone(),
883 major: semver.map(|v| v.major() as i32),
884 minor: semver.map(|v| v.minor() as i32),
885 patch: semver.map(|v| v.patch() as i32),
886 checksum_matched,
887 release_channel: body.release_channel.clone().unwrap_or_default(),
888 os_name: body.os_name.clone(),
889 os_version: body.os_version.clone().unwrap_or_default(),
890 installation_id: body.installation_id.clone(),
891 session_id: body.session_id.clone(),
892 is_staff: body.is_staff,
893 time: time.timestamp_millis(),
894 conversation_id: event.conversation_id.unwrap_or_default(),
895 kind: event.kind.to_string(),
896 phase: event.phase.to_string(),
897 model: event.model,
898 response_latency_in_ms: event
899 .response_latency
900 .map(|latency| latency.as_millis() as i64),
901 error_message: event.error_message,
902 }
903 }
904}
905
906#[derive(Debug, clickhouse::Row, Serialize)]
907pub struct CpuEventRow {
908 system_id: Option<String>,
909 installation_id: Option<String>,
910 session_id: Option<String>,
911 is_staff: Option<bool>,
912 usage_as_percentage: f32,
913 core_count: u32,
914 app_version: String,
915 release_channel: String,
916 os_name: String,
917 os_version: String,
918 time: i64,
919 // pub normalized_cpu_usage: f64, MATERIALIZED
920 major: Option<i32>,
921 minor: Option<i32>,
922 patch: Option<i32>,
923 checksum_matched: bool,
924}
925
926impl CpuEventRow {
927 fn from_event(
928 event: CpuEvent,
929 wrapper: &EventWrapper,
930 body: &EventRequestBody,
931 first_event_at: chrono::DateTime<chrono::Utc>,
932 checksum_matched: bool,
933 ) -> Self {
934 let semver = body.semver();
935 let time =
936 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
937
938 Self {
939 app_version: body.app_version.clone(),
940 major: semver.map(|v| v.major() as i32),
941 minor: semver.map(|v| v.minor() as i32),
942 patch: semver.map(|v| v.patch() as i32),
943 checksum_matched,
944 release_channel: body.release_channel.clone().unwrap_or_default(),
945 os_name: body.os_name.clone(),
946 os_version: body.os_version.clone().unwrap_or_default(),
947 system_id: body.system_id.clone(),
948 installation_id: body.installation_id.clone(),
949 session_id: body.session_id.clone(),
950 is_staff: body.is_staff,
951 time: time.timestamp_millis(),
952 usage_as_percentage: event.usage_as_percentage,
953 core_count: event.core_count,
954 }
955 }
956}
957
958#[derive(Serialize, Debug, clickhouse::Row)]
959pub struct MemoryEventRow {
960 // AppInfoBase
961 app_version: String,
962 major: Option<i32>,
963 minor: Option<i32>,
964 patch: Option<i32>,
965 checksum_matched: bool,
966 release_channel: String,
967 os_name: String,
968 os_version: String,
969
970 // ClientEventBase
971 system_id: Option<String>,
972 installation_id: Option<String>,
973 session_id: Option<String>,
974 is_staff: Option<bool>,
975 time: i64,
976
977 // MemoryEventRow
978 memory_in_bytes: u64,
979 virtual_memory_in_bytes: u64,
980}
981
982impl MemoryEventRow {
983 fn from_event(
984 event: MemoryEvent,
985 wrapper: &EventWrapper,
986 body: &EventRequestBody,
987 first_event_at: chrono::DateTime<chrono::Utc>,
988 checksum_matched: bool,
989 ) -> Self {
990 let semver = body.semver();
991 let time =
992 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
993
994 Self {
995 app_version: body.app_version.clone(),
996 major: semver.map(|v| v.major() as i32),
997 minor: semver.map(|v| v.minor() as i32),
998 patch: semver.map(|v| v.patch() as i32),
999 checksum_matched,
1000 release_channel: body.release_channel.clone().unwrap_or_default(),
1001 os_name: body.os_name.clone(),
1002 os_version: body.os_version.clone().unwrap_or_default(),
1003 system_id: body.system_id.clone(),
1004 installation_id: body.installation_id.clone(),
1005 session_id: body.session_id.clone(),
1006 is_staff: body.is_staff,
1007 time: time.timestamp_millis(),
1008 memory_in_bytes: event.memory_in_bytes,
1009 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
1010 }
1011 }
1012}
1013
1014#[derive(Serialize, Debug, clickhouse::Row)]
1015pub struct AppEventRow {
1016 // AppInfoBase
1017 app_version: String,
1018 major: Option<i32>,
1019 minor: Option<i32>,
1020 patch: Option<i32>,
1021 checksum_matched: bool,
1022 release_channel: String,
1023 os_name: String,
1024 os_version: String,
1025
1026 // ClientEventBase
1027 system_id: Option<String>,
1028 installation_id: Option<String>,
1029 session_id: Option<String>,
1030 is_staff: Option<bool>,
1031 time: i64,
1032
1033 // AppEventRow
1034 operation: String,
1035}
1036
1037impl AppEventRow {
1038 fn from_event(
1039 event: AppEvent,
1040 wrapper: &EventWrapper,
1041 body: &EventRequestBody,
1042 first_event_at: chrono::DateTime<chrono::Utc>,
1043 checksum_matched: bool,
1044 ) -> Self {
1045 let semver = body.semver();
1046 let time =
1047 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1048
1049 Self {
1050 app_version: body.app_version.clone(),
1051 major: semver.map(|v| v.major() as i32),
1052 minor: semver.map(|v| v.minor() as i32),
1053 patch: semver.map(|v| v.patch() as i32),
1054 checksum_matched,
1055 release_channel: body.release_channel.clone().unwrap_or_default(),
1056 os_name: body.os_name.clone(),
1057 os_version: body.os_version.clone().unwrap_or_default(),
1058 system_id: body.system_id.clone(),
1059 installation_id: body.installation_id.clone(),
1060 session_id: body.session_id.clone(),
1061 is_staff: body.is_staff,
1062 time: time.timestamp_millis(),
1063 operation: event.operation,
1064 }
1065 }
1066}
1067
1068#[derive(Serialize, Debug, clickhouse::Row)]
1069pub struct SettingEventRow {
1070 // AppInfoBase
1071 app_version: String,
1072 major: Option<i32>,
1073 minor: Option<i32>,
1074 patch: Option<i32>,
1075 checksum_matched: bool,
1076 release_channel: String,
1077 os_name: String,
1078 os_version: String,
1079
1080 // ClientEventBase
1081 system_id: Option<String>,
1082 installation_id: Option<String>,
1083 session_id: Option<String>,
1084 is_staff: Option<bool>,
1085 time: i64,
1086 // SettingEventRow
1087 setting: String,
1088 value: String,
1089}
1090
1091impl SettingEventRow {
1092 fn from_event(
1093 event: SettingEvent,
1094 wrapper: &EventWrapper,
1095 body: &EventRequestBody,
1096 first_event_at: chrono::DateTime<chrono::Utc>,
1097 checksum_matched: bool,
1098 ) -> Self {
1099 let semver = body.semver();
1100 let time =
1101 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1102
1103 Self {
1104 app_version: body.app_version.clone(),
1105 major: semver.map(|v| v.major() as i32),
1106 minor: semver.map(|v| v.minor() as i32),
1107 checksum_matched,
1108 patch: semver.map(|v| v.patch() as i32),
1109 release_channel: body.release_channel.clone().unwrap_or_default(),
1110 os_name: body.os_name.clone(),
1111 os_version: body.os_version.clone().unwrap_or_default(),
1112 system_id: body.system_id.clone(),
1113 installation_id: body.installation_id.clone(),
1114 session_id: body.session_id.clone(),
1115 is_staff: body.is_staff,
1116 time: time.timestamp_millis(),
1117 setting: event.setting,
1118 value: event.value,
1119 }
1120 }
1121}
1122
1123#[derive(Serialize, Debug, clickhouse::Row)]
1124pub struct ExtensionEventRow {
1125 // AppInfoBase
1126 app_version: String,
1127 major: Option<i32>,
1128 minor: Option<i32>,
1129 patch: Option<i32>,
1130 checksum_matched: bool,
1131 release_channel: String,
1132 os_name: String,
1133 os_version: String,
1134
1135 // ClientEventBase
1136 system_id: Option<String>,
1137 installation_id: Option<String>,
1138 session_id: Option<String>,
1139 is_staff: Option<bool>,
1140 time: i64,
1141
1142 // ExtensionEventRow
1143 extension_id: Arc<str>,
1144 extension_version: Arc<str>,
1145 dev: bool,
1146 schema_version: Option<i32>,
1147 wasm_api_version: Option<String>,
1148}
1149
1150impl ExtensionEventRow {
1151 fn from_event(
1152 event: ExtensionEvent,
1153 wrapper: &EventWrapper,
1154 body: &EventRequestBody,
1155 extension_metadata: Option<ExtensionMetadata>,
1156 first_event_at: chrono::DateTime<chrono::Utc>,
1157 checksum_matched: bool,
1158 ) -> Self {
1159 let semver = body.semver();
1160 let time =
1161 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1162
1163 Self {
1164 app_version: body.app_version.clone(),
1165 major: semver.map(|v| v.major() as i32),
1166 minor: semver.map(|v| v.minor() as i32),
1167 patch: semver.map(|v| v.patch() as i32),
1168 checksum_matched,
1169 release_channel: body.release_channel.clone().unwrap_or_default(),
1170 os_name: body.os_name.clone(),
1171 os_version: body.os_version.clone().unwrap_or_default(),
1172 system_id: body.system_id.clone(),
1173 installation_id: body.installation_id.clone(),
1174 session_id: body.session_id.clone(),
1175 is_staff: body.is_staff,
1176 time: time.timestamp_millis(),
1177 extension_id: event.extension_id,
1178 extension_version: event.version,
1179 dev: extension_metadata.is_none(),
1180 schema_version: extension_metadata
1181 .as_ref()
1182 .and_then(|metadata| metadata.manifest.schema_version),
1183 wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1184 metadata
1185 .manifest
1186 .wasm_api_version
1187 .as_ref()
1188 .map(|version| version.to_string())
1189 }),
1190 }
1191 }
1192}
1193
1194#[derive(Serialize, Debug, clickhouse::Row)]
1195pub struct ReplEventRow {
1196 // AppInfoBase
1197 app_version: String,
1198 major: Option<i32>,
1199 minor: Option<i32>,
1200 patch: Option<i32>,
1201 checksum_matched: bool,
1202 release_channel: String,
1203 os_name: String,
1204 os_version: String,
1205
1206 // ClientEventBase
1207 installation_id: Option<String>,
1208 session_id: Option<String>,
1209 is_staff: Option<bool>,
1210 time: i64,
1211
1212 // ReplEventRow
1213 kernel_language: String,
1214 kernel_status: String,
1215 repl_session_id: String,
1216}
1217
1218impl ReplEventRow {
1219 fn from_event(
1220 event: ReplEvent,
1221 wrapper: &EventWrapper,
1222 body: &EventRequestBody,
1223 first_event_at: chrono::DateTime<chrono::Utc>,
1224 checksum_matched: bool,
1225 ) -> Self {
1226 let semver = body.semver();
1227 let time =
1228 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1229
1230 Self {
1231 app_version: body.app_version.clone(),
1232 major: semver.map(|v| v.major() as i32),
1233 minor: semver.map(|v| v.minor() as i32),
1234 patch: semver.map(|v| v.patch() as i32),
1235 checksum_matched,
1236 release_channel: body.release_channel.clone().unwrap_or_default(),
1237 os_name: body.os_name.clone(),
1238 os_version: body.os_version.clone().unwrap_or_default(),
1239 installation_id: body.installation_id.clone(),
1240 session_id: body.session_id.clone(),
1241 is_staff: body.is_staff,
1242 time: time.timestamp_millis(),
1243 kernel_language: event.kernel_language,
1244 kernel_status: event.kernel_status,
1245 repl_session_id: event.repl_session_id,
1246 }
1247 }
1248}
1249
1250#[derive(Serialize, Debug, clickhouse::Row)]
1251pub struct EditEventRow {
1252 // AppInfoBase
1253 app_version: String,
1254 major: Option<i32>,
1255 minor: Option<i32>,
1256 patch: Option<i32>,
1257 checksum_matched: bool,
1258 release_channel: String,
1259 os_name: String,
1260 os_version: String,
1261
1262 // ClientEventBase
1263 system_id: Option<String>,
1264 installation_id: Option<String>,
1265 // Note: This column name has a typo in the ClickHouse table.
1266 #[serde(rename = "sesssion_id")]
1267 session_id: Option<String>,
1268 is_staff: Option<bool>,
1269 time: i64,
1270
1271 // EditEventRow
1272 period_start: i64,
1273 period_end: i64,
1274 environment: String,
1275}
1276
1277impl EditEventRow {
1278 fn from_event(
1279 event: EditEvent,
1280 wrapper: &EventWrapper,
1281 body: &EventRequestBody,
1282 first_event_at: chrono::DateTime<chrono::Utc>,
1283 checksum_matched: bool,
1284 ) -> Self {
1285 let semver = body.semver();
1286 let time =
1287 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1288
1289 let period_start = time - chrono::Duration::milliseconds(event.duration);
1290 let period_end = time;
1291
1292 Self {
1293 app_version: body.app_version.clone(),
1294 major: semver.map(|v| v.major() as i32),
1295 minor: semver.map(|v| v.minor() as i32),
1296 patch: semver.map(|v| v.patch() as i32),
1297 checksum_matched,
1298 release_channel: body.release_channel.clone().unwrap_or_default(),
1299 os_name: body.os_name.clone(),
1300 os_version: body.os_version.clone().unwrap_or_default(),
1301 system_id: body.system_id.clone(),
1302 installation_id: body.installation_id.clone(),
1303 session_id: body.session_id.clone(),
1304 is_staff: body.is_staff,
1305 time: time.timestamp_millis(),
1306 period_start: period_start.timestamp_millis(),
1307 period_end: period_end.timestamp_millis(),
1308 environment: event.environment,
1309 }
1310 }
1311}
1312
1313#[derive(Serialize, Debug, clickhouse::Row)]
1314pub struct ActionEventRow {
1315 // AppInfoBase
1316 app_version: String,
1317 major: Option<i32>,
1318 minor: Option<i32>,
1319 patch: Option<i32>,
1320 checksum_matched: bool,
1321 release_channel: String,
1322 os_name: String,
1323 os_version: String,
1324
1325 // ClientEventBase
1326 installation_id: Option<String>,
1327 // Note: This column name has a typo in the ClickHouse table.
1328 #[serde(rename = "sesssion_id")]
1329 session_id: Option<String>,
1330 is_staff: Option<bool>,
1331 time: i64,
1332 // ActionEventRow
1333 source: String,
1334 action: String,
1335}
1336
1337impl ActionEventRow {
1338 fn from_event(
1339 event: ActionEvent,
1340 wrapper: &EventWrapper,
1341 body: &EventRequestBody,
1342 first_event_at: chrono::DateTime<chrono::Utc>,
1343 checksum_matched: bool,
1344 ) -> Self {
1345 let semver = body.semver();
1346 let time =
1347 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1348
1349 Self {
1350 app_version: body.app_version.clone(),
1351 major: semver.map(|v| v.major() as i32),
1352 minor: semver.map(|v| v.minor() as i32),
1353 patch: semver.map(|v| v.patch() as i32),
1354 checksum_matched,
1355 release_channel: body.release_channel.clone().unwrap_or_default(),
1356 os_name: body.os_name.clone(),
1357 os_version: body.os_version.clone().unwrap_or_default(),
1358 installation_id: body.installation_id.clone(),
1359 session_id: body.session_id.clone(),
1360 is_staff: body.is_staff,
1361 time: time.timestamp_millis(),
1362 source: event.source,
1363 action: event.action,
1364 }
1365 }
1366}
1367
1368pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1369 let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
1370
1371 let mut summer = Sha256::new();
1372 summer.update(checksum_seed);
1373 summer.update(json);
1374 summer.update(checksum_seed);
1375 Some(summer.finalize().into_iter().collect())
1376}