1use super::ips_file::IpsFile;
2use crate::api::CloudflareIpCountryHeader;
3use crate::{api::slack, AppState, Error, Result};
4use anyhow::{anyhow, Context};
5use aws_sdk_s3::primitives::ByteStream;
6use axum::{
7 body::Bytes,
8 headers::Header,
9 http::{HeaderMap, HeaderName, StatusCode},
10 routing::post,
11 Extension, Router, TypedHeader,
12};
13use rpc::ExtensionMetadata;
14use semantic_version::SemanticVersion;
15use serde::{Serialize, Serializer};
16use sha2::{Digest, Sha256};
17use std::sync::{Arc, OnceLock};
18use telemetry_events::{
19 ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
20 EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, ReplEvent,
21 SettingEvent,
22};
23use uuid::Uuid;
24
25static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
26
27pub fn router() -> Router {
28 Router::new()
29 .route("/telemetry/events", post(post_events))
30 .route("/telemetry/crashes", post(post_crash))
31 .route("/telemetry/panics", post(post_panic))
32 .route("/telemetry/hangs", post(post_hang))
33}
34
35pub struct ZedChecksumHeader(Vec<u8>);
36
37impl Header for ZedChecksumHeader {
38 fn name() -> &'static HeaderName {
39 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
40 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
41 }
42
43 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
44 where
45 Self: Sized,
46 I: Iterator<Item = &'i axum::http::HeaderValue>,
47 {
48 let checksum = values
49 .next()
50 .ok_or_else(axum::headers::Error::invalid)?
51 .to_str()
52 .map_err(|_| axum::headers::Error::invalid())?;
53
54 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
55 Ok(Self(bytes))
56 }
57
58 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
59 unimplemented!()
60 }
61}
62
63pub async fn post_crash(
64 Extension(app): Extension<Arc<AppState>>,
65 headers: HeaderMap,
66 body: Bytes,
67) -> Result<()> {
68 let report = IpsFile::parse(&body)?;
69 let version_threshold = SemanticVersion::new(0, 123, 0);
70
71 let bundle_id = &report.header.bundle_id;
72 let app_version = &report.app_version();
73
74 if bundle_id == "dev.zed.Zed-Dev" {
75 log::error!("Crash uploads from {} are ignored.", bundle_id);
76 return Ok(());
77 }
78
79 if app_version.is_none() || app_version.unwrap() < version_threshold {
80 log::error!(
81 "Crash uploads from {} are ignored.",
82 report.header.app_version
83 );
84 return Ok(());
85 }
86 let app_version = app_version.unwrap();
87
88 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
89 let response = blob_store_client
90 .head_object()
91 .bucket(CRASH_REPORTS_BUCKET)
92 .key(report.header.incident_id.clone() + ".ips")
93 .send()
94 .await;
95
96 if response.is_ok() {
97 log::info!("We've already uploaded this crash");
98 return Ok(());
99 }
100
101 blob_store_client
102 .put_object()
103 .bucket(CRASH_REPORTS_BUCKET)
104 .key(report.header.incident_id.clone() + ".ips")
105 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
106 .body(ByteStream::from(body.to_vec()))
107 .send()
108 .await
109 .map_err(|e| log::error!("Failed to upload crash: {}", e))
110 .ok();
111 }
112
113 let recent_panic_on: Option<i64> = headers
114 .get("x-zed-panicked-on")
115 .and_then(|h| h.to_str().ok())
116 .and_then(|s| s.parse().ok());
117
118 let installation_id = headers
119 .get("x-zed-installation-id")
120 .and_then(|h| h.to_str().ok())
121 .map(|s| s.to_string())
122 .unwrap_or_default();
123
124 let mut recent_panic = None;
125
126 if let Some(recent_panic_on) = recent_panic_on {
127 let crashed_at = match report.timestamp() {
128 Ok(t) => Some(t),
129 Err(e) => {
130 log::error!("Can't parse {}: {}", report.header.timestamp, e);
131 None
132 }
133 };
134 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
135 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
136 }
137 }
138
139 let description = report.description(recent_panic);
140 let summary = report.backtrace_summary();
141
142 tracing::error!(
143 service = "client",
144 version = %report.header.app_version,
145 os_version = %report.header.os_version,
146 bundle_id = %report.header.bundle_id,
147 incident_id = %report.header.incident_id,
148 installation_id = %installation_id,
149 description = %description,
150 backtrace = %summary,
151 "crash report");
152
153 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
154 let payload = slack::WebhookBody::new(|w| {
155 w.add_section(|s| s.text(slack::Text::markdown(description)))
156 .add_section(|s| {
157 s.add_field(slack::Text::markdown(format!(
158 "*Version:*\n{} ({})",
159 bundle_id, app_version
160 )))
161 .add_field({
162 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
163 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
164 hostname.strip_prefix("http://").unwrap_or_default()
165 });
166
167 slack::Text::markdown(format!(
168 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
169 CRASH_REPORTS_BUCKET,
170 hostname,
171 report.header.incident_id,
172 report
173 .header
174 .incident_id
175 .chars()
176 .take(8)
177 .collect::<String>(),
178 ))
179 })
180 })
181 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
182 });
183 let payload_json = serde_json::to_string(&payload).map_err(|err| {
184 log::error!("Failed to serialize payload to JSON: {err}");
185 Error::Internal(anyhow!(err))
186 })?;
187
188 reqwest::Client::new()
189 .post(slack_panics_webhook)
190 .header("Content-Type", "application/json")
191 .body(payload_json)
192 .send()
193 .await
194 .map_err(|err| {
195 log::error!("Failed to send payload to Slack: {err}");
196 Error::Internal(anyhow!(err))
197 })?;
198 }
199
200 Ok(())
201}
202
203pub async fn post_hang(
204 Extension(app): Extension<Arc<AppState>>,
205 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
206 body: Bytes,
207) -> Result<()> {
208 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
209 return Err(Error::http(
210 StatusCode::INTERNAL_SERVER_ERROR,
211 "events not enabled".into(),
212 ))?;
213 };
214
215 if checksum != expected {
216 return Err(Error::http(
217 StatusCode::BAD_REQUEST,
218 "invalid checksum".into(),
219 ))?;
220 }
221
222 let incident_id = Uuid::new_v4().to_string();
223
224 // dump JSON into S3 so we can get frame offsets if we need to.
225 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
226 blob_store_client
227 .put_object()
228 .bucket(CRASH_REPORTS_BUCKET)
229 .key(incident_id.clone() + ".hang.json")
230 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
231 .body(ByteStream::from(body.to_vec()))
232 .send()
233 .await
234 .map_err(|e| log::error!("Failed to upload crash: {}", e))
235 .ok();
236 }
237
238 let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
239 log::error!("can't parse report json: {err}");
240 Error::Internal(anyhow!(err))
241 })?;
242
243 let mut backtrace = "Possible hang detected on main thread:".to_string();
244 let unknown = "<unknown>".to_string();
245 for frame in report.backtrace.iter() {
246 backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
247 }
248
249 tracing::error!(
250 service = "client",
251 version = %report.app_version.unwrap_or_default().to_string(),
252 os_name = %report.os_name,
253 os_version = report.os_version.unwrap_or_default().to_string(),
254 incident_id = %incident_id,
255 installation_id = %report.installation_id.unwrap_or_default(),
256 backtrace = %backtrace,
257 "hang report");
258
259 Ok(())
260}
261
262pub async fn post_panic(
263 Extension(app): Extension<Arc<AppState>>,
264 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
265 body: Bytes,
266) -> Result<()> {
267 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
268 return Err(Error::http(
269 StatusCode::INTERNAL_SERVER_ERROR,
270 "events not enabled".into(),
271 ))?;
272 };
273
274 if checksum != expected {
275 return Err(Error::http(
276 StatusCode::BAD_REQUEST,
277 "invalid checksum".into(),
278 ))?;
279 }
280
281 let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
282 .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
283 let panic = report.panic;
284
285 if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
286 return Err(Error::http(
287 StatusCode::BAD_REQUEST,
288 "invalid os version".into(),
289 ))?;
290 }
291
292 tracing::error!(
293 service = "client",
294 version = %panic.app_version,
295 os_name = %panic.os_name,
296 os_version = %panic.os_version.clone().unwrap_or_default(),
297 installation_id = %panic.installation_id.unwrap_or_default(),
298 description = %panic.payload,
299 backtrace = %panic.backtrace.join("\n"),
300 "panic report");
301
302 let backtrace = if panic.backtrace.len() > 25 {
303 let total = panic.backtrace.len();
304 format!(
305 "{}\n and {} more",
306 panic
307 .backtrace
308 .iter()
309 .take(20)
310 .cloned()
311 .collect::<Vec<_>>()
312 .join("\n"),
313 total - 20
314 )
315 } else {
316 panic.backtrace.join("\n")
317 };
318 let backtrace_with_summary = panic.payload + "\n" + &backtrace;
319
320 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
321 let payload = slack::WebhookBody::new(|w| {
322 w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
323 .add_section(|s| {
324 s.add_field(slack::Text::markdown(format!(
325 "*Version:*\n {} ",
326 panic.app_version
327 )))
328 .add_field({
329 slack::Text::markdown(format!(
330 "*OS:*\n{} {}",
331 panic.os_name,
332 panic.os_version.unwrap_or_default()
333 ))
334 })
335 })
336 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
337 });
338 let payload_json = serde_json::to_string(&payload).map_err(|err| {
339 log::error!("Failed to serialize payload to JSON: {err}");
340 Error::Internal(anyhow!(err))
341 })?;
342
343 reqwest::Client::new()
344 .post(slack_panics_webhook)
345 .header("Content-Type", "application/json")
346 .body(payload_json)
347 .send()
348 .await
349 .map_err(|err| {
350 log::error!("Failed to send payload to Slack: {err}");
351 Error::Internal(anyhow!(err))
352 })?;
353 }
354
355 Ok(())
356}
357
358pub async fn post_events(
359 Extension(app): Extension<Arc<AppState>>,
360 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
361 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
362 body: Bytes,
363) -> Result<()> {
364 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
365 Err(Error::http(
366 StatusCode::NOT_IMPLEMENTED,
367 "not supported".into(),
368 ))?
369 };
370
371 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
372 return Err(Error::http(
373 StatusCode::INTERNAL_SERVER_ERROR,
374 "events not enabled".into(),
375 ))?;
376 };
377
378 let checksum_matched = checksum == expected;
379
380 let request_body: telemetry_events::EventRequestBody =
381 serde_json::from_slice(&body).map_err(|err| {
382 log::error!("can't parse event json: {err}");
383 Error::Internal(anyhow!(err))
384 })?;
385
386 let mut to_upload = ToUpload::default();
387 let Some(last_event) = request_body.events.last() else {
388 return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
389 };
390 let country_code = country_code_header.map(|h| h.to_string());
391
392 let first_event_at = chrono::Utc::now()
393 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
394
395 for wrapper in &request_body.events {
396 match &wrapper.event {
397 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
398 event.clone(),
399 &wrapper,
400 &request_body,
401 first_event_at,
402 country_code.clone(),
403 checksum_matched,
404 )),
405 // Needed for clients sending old copilot_event types
406 Event::Copilot(_) => {}
407 Event::InlineCompletion(event) => {
408 to_upload
409 .inline_completion_events
410 .push(InlineCompletionEventRow::from_event(
411 event.clone(),
412 &wrapper,
413 &request_body,
414 first_event_at,
415 country_code.clone(),
416 checksum_matched,
417 ))
418 }
419 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
420 event.clone(),
421 &wrapper,
422 &request_body,
423 first_event_at,
424 checksum_matched,
425 )),
426 Event::Assistant(event) => {
427 to_upload
428 .assistant_events
429 .push(AssistantEventRow::from_event(
430 event.clone(),
431 &wrapper,
432 &request_body,
433 first_event_at,
434 checksum_matched,
435 ))
436 }
437 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
438 event.clone(),
439 &wrapper,
440 &request_body,
441 first_event_at,
442 checksum_matched,
443 )),
444 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
445 event.clone(),
446 &wrapper,
447 &request_body,
448 first_event_at,
449 checksum_matched,
450 )),
451 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
452 event.clone(),
453 &wrapper,
454 &request_body,
455 first_event_at,
456 checksum_matched,
457 )),
458 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
459 event.clone(),
460 &wrapper,
461 &request_body,
462 first_event_at,
463 checksum_matched,
464 )),
465 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
466 event.clone(),
467 &wrapper,
468 &request_body,
469 first_event_at,
470 checksum_matched,
471 )),
472 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
473 event.clone(),
474 &wrapper,
475 &request_body,
476 first_event_at,
477 checksum_matched,
478 )),
479 Event::Extension(event) => {
480 let metadata = app
481 .db
482 .get_extension_version(&event.extension_id, &event.version)
483 .await?;
484 to_upload
485 .extension_events
486 .push(ExtensionEventRow::from_event(
487 event.clone(),
488 &wrapper,
489 &request_body,
490 metadata,
491 first_event_at,
492 checksum_matched,
493 ))
494 }
495 Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
496 event.clone(),
497 &wrapper,
498 &request_body,
499 first_event_at,
500 checksum_matched,
501 )),
502 }
503 }
504
505 to_upload
506 .upload(&clickhouse_client)
507 .await
508 .map_err(|err| Error::Internal(anyhow!(err)))?;
509
510 Ok(())
511}
512
513#[derive(Default)]
514struct ToUpload {
515 editor_events: Vec<EditorEventRow>,
516 inline_completion_events: Vec<InlineCompletionEventRow>,
517 assistant_events: Vec<AssistantEventRow>,
518 call_events: Vec<CallEventRow>,
519 cpu_events: Vec<CpuEventRow>,
520 memory_events: Vec<MemoryEventRow>,
521 app_events: Vec<AppEventRow>,
522 setting_events: Vec<SettingEventRow>,
523 extension_events: Vec<ExtensionEventRow>,
524 edit_events: Vec<EditEventRow>,
525 action_events: Vec<ActionEventRow>,
526 repl_events: Vec<ReplEventRow>,
527}
528
529impl ToUpload {
530 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
531 const EDITOR_EVENTS_TABLE: &str = "editor_events";
532 Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
533 .await
534 .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
535
536 const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
537 Self::upload_to_table(
538 INLINE_COMPLETION_EVENTS_TABLE,
539 &self.inline_completion_events,
540 clickhouse_client,
541 )
542 .await
543 .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
544
545 const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
546 Self::upload_to_table(
547 ASSISTANT_EVENTS_TABLE,
548 &self.assistant_events,
549 clickhouse_client,
550 )
551 .await
552 .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
553
554 const CALL_EVENTS_TABLE: &str = "call_events";
555 Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
556 .await
557 .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
558
559 const CPU_EVENTS_TABLE: &str = "cpu_events";
560 Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
561 .await
562 .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
563
564 const MEMORY_EVENTS_TABLE: &str = "memory_events";
565 Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
566 .await
567 .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
568
569 const APP_EVENTS_TABLE: &str = "app_events";
570 Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
571 .await
572 .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
573
574 const SETTING_EVENTS_TABLE: &str = "setting_events";
575 Self::upload_to_table(
576 SETTING_EVENTS_TABLE,
577 &self.setting_events,
578 clickhouse_client,
579 )
580 .await
581 .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
582
583 const EXTENSION_EVENTS_TABLE: &str = "extension_events";
584 Self::upload_to_table(
585 EXTENSION_EVENTS_TABLE,
586 &self.extension_events,
587 clickhouse_client,
588 )
589 .await
590 .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
591
592 const EDIT_EVENTS_TABLE: &str = "edit_events";
593 Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
594 .await
595 .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
596
597 const ACTION_EVENTS_TABLE: &str = "action_events";
598 Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
599 .await
600 .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
601
602 const REPL_EVENTS_TABLE: &str = "repl_events";
603 Self::upload_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
604 .await
605 .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
606
607 Ok(())
608 }
609
610 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
611 table: &str,
612 rows: &[T],
613 clickhouse_client: &clickhouse::Client,
614 ) -> anyhow::Result<()> {
615 if rows.is_empty() {
616 return Ok(());
617 }
618
619 let mut insert = clickhouse_client.insert(table)?;
620
621 for event in rows {
622 insert.write(event).await?;
623 }
624
625 insert.end().await?;
626
627 let event_count = rows.len();
628 log::info!(
629 "wrote {event_count} {event_specifier} to '{table}'",
630 event_specifier = if event_count == 1 { "event" } else { "events" }
631 );
632
633 Ok(())
634 }
635}
636
637pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
638where
639 S: Serializer,
640{
641 if country_code.len() != 2 {
642 use serde::ser::Error;
643 return Err(S::Error::custom(
644 "country_code must be exactly 2 characters",
645 ));
646 }
647
648 let country_code = country_code.as_bytes();
649
650 serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
651}
652
653#[derive(Serialize, Debug, clickhouse::Row)]
654pub struct EditorEventRow {
655 installation_id: String,
656 metrics_id: String,
657 operation: String,
658 app_version: String,
659 file_extension: String,
660 os_name: String,
661 os_version: String,
662 release_channel: String,
663 signed_in: bool,
664 vim_mode: bool,
665 #[serde(serialize_with = "serialize_country_code")]
666 country_code: String,
667 region_code: String,
668 city: String,
669 time: i64,
670 copilot_enabled: bool,
671 copilot_enabled_for_language: bool,
672 historical_event: bool,
673 architecture: String,
674 is_staff: Option<bool>,
675 session_id: Option<String>,
676 major: Option<i32>,
677 minor: Option<i32>,
678 patch: Option<i32>,
679 checksum_matched: bool,
680}
681
682impl EditorEventRow {
683 fn from_event(
684 event: EditorEvent,
685 wrapper: &EventWrapper,
686 body: &EventRequestBody,
687 first_event_at: chrono::DateTime<chrono::Utc>,
688 country_code: Option<String>,
689 checksum_matched: bool,
690 ) -> Self {
691 let semver = body.semver();
692 let time =
693 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
694
695 Self {
696 app_version: body.app_version.clone(),
697 major: semver.map(|v| v.major() as i32),
698 minor: semver.map(|v| v.minor() as i32),
699 patch: semver.map(|v| v.patch() as i32),
700 checksum_matched,
701 release_channel: body.release_channel.clone().unwrap_or_default(),
702 os_name: body.os_name.clone(),
703 os_version: body.os_version.clone().unwrap_or_default(),
704 architecture: body.architecture.clone(),
705 installation_id: body.installation_id.clone().unwrap_or_default(),
706 metrics_id: body.metrics_id.clone().unwrap_or_default(),
707 session_id: body.session_id.clone(),
708 is_staff: body.is_staff,
709 time: time.timestamp_millis(),
710 operation: event.operation,
711 file_extension: event.file_extension.unwrap_or_default(),
712 signed_in: wrapper.signed_in,
713 vim_mode: event.vim_mode,
714 copilot_enabled: event.copilot_enabled,
715 copilot_enabled_for_language: event.copilot_enabled_for_language,
716 country_code: country_code.unwrap_or("XX".to_string()),
717 region_code: "".to_string(),
718 city: "".to_string(),
719 historical_event: false,
720 }
721 }
722}
723
724#[derive(Serialize, Debug, clickhouse::Row)]
725pub struct InlineCompletionEventRow {
726 installation_id: String,
727 provider: String,
728 suggestion_accepted: bool,
729 app_version: String,
730 file_extension: String,
731 os_name: String,
732 os_version: String,
733 release_channel: String,
734 signed_in: bool,
735 #[serde(serialize_with = "serialize_country_code")]
736 country_code: String,
737 region_code: String,
738 city: String,
739 time: i64,
740 is_staff: Option<bool>,
741 session_id: Option<String>,
742 major: Option<i32>,
743 minor: Option<i32>,
744 patch: Option<i32>,
745 checksum_matched: bool,
746}
747
748impl InlineCompletionEventRow {
749 fn from_event(
750 event: InlineCompletionEvent,
751 wrapper: &EventWrapper,
752 body: &EventRequestBody,
753 first_event_at: chrono::DateTime<chrono::Utc>,
754 country_code: Option<String>,
755 checksum_matched: bool,
756 ) -> Self {
757 let semver = body.semver();
758 let time =
759 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
760
761 Self {
762 app_version: body.app_version.clone(),
763 major: semver.map(|v| v.major() as i32),
764 minor: semver.map(|v| v.minor() as i32),
765 patch: semver.map(|v| v.patch() as i32),
766 checksum_matched,
767 release_channel: body.release_channel.clone().unwrap_or_default(),
768 os_name: body.os_name.clone(),
769 os_version: body.os_version.clone().unwrap_or_default(),
770 installation_id: body.installation_id.clone().unwrap_or_default(),
771 session_id: body.session_id.clone(),
772 is_staff: body.is_staff,
773 time: time.timestamp_millis(),
774 file_extension: event.file_extension.unwrap_or_default(),
775 signed_in: wrapper.signed_in,
776 country_code: country_code.unwrap_or("XX".to_string()),
777 region_code: "".to_string(),
778 city: "".to_string(),
779 provider: event.provider,
780 suggestion_accepted: event.suggestion_accepted,
781 }
782 }
783}
784
785#[derive(Serialize, Debug, clickhouse::Row)]
786pub struct CallEventRow {
787 // AppInfoBase
788 app_version: String,
789 major: Option<i32>,
790 minor: Option<i32>,
791 patch: Option<i32>,
792 release_channel: String,
793 os_name: String,
794 os_version: String,
795 checksum_matched: bool,
796
797 // ClientEventBase
798 installation_id: String,
799 session_id: Option<String>,
800 is_staff: Option<bool>,
801 time: i64,
802
803 // CallEventRow
804 operation: String,
805 room_id: Option<u64>,
806 channel_id: Option<u64>,
807}
808
809impl CallEventRow {
810 fn from_event(
811 event: CallEvent,
812 wrapper: &EventWrapper,
813 body: &EventRequestBody,
814 first_event_at: chrono::DateTime<chrono::Utc>,
815 checksum_matched: bool,
816 ) -> Self {
817 let semver = body.semver();
818 let time =
819 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
820
821 Self {
822 app_version: body.app_version.clone(),
823 major: semver.map(|v| v.major() as i32),
824 minor: semver.map(|v| v.minor() as i32),
825 patch: semver.map(|v| v.patch() as i32),
826 checksum_matched,
827 release_channel: body.release_channel.clone().unwrap_or_default(),
828 os_name: body.os_name.clone(),
829 os_version: body.os_version.clone().unwrap_or_default(),
830 installation_id: body.installation_id.clone().unwrap_or_default(),
831 session_id: body.session_id.clone(),
832 is_staff: body.is_staff,
833 time: time.timestamp_millis(),
834 operation: event.operation,
835 room_id: event.room_id,
836 channel_id: event.channel_id,
837 }
838 }
839}
840
841#[derive(Serialize, Debug, clickhouse::Row)]
842pub struct AssistantEventRow {
843 // AppInfoBase
844 app_version: String,
845 major: Option<i32>,
846 minor: Option<i32>,
847 patch: Option<i32>,
848 checksum_matched: bool,
849 release_channel: String,
850 os_name: String,
851 os_version: String,
852
853 // ClientEventBase
854 installation_id: Option<String>,
855 session_id: Option<String>,
856 is_staff: Option<bool>,
857 time: i64,
858
859 // AssistantEventRow
860 conversation_id: String,
861 kind: String,
862 model: String,
863 response_latency_in_ms: Option<i64>,
864 error_message: Option<String>,
865}
866
867impl AssistantEventRow {
868 fn from_event(
869 event: AssistantEvent,
870 wrapper: &EventWrapper,
871 body: &EventRequestBody,
872 first_event_at: chrono::DateTime<chrono::Utc>,
873 checksum_matched: bool,
874 ) -> Self {
875 let semver = body.semver();
876 let time =
877 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
878
879 Self {
880 app_version: body.app_version.clone(),
881 major: semver.map(|v| v.major() as i32),
882 minor: semver.map(|v| v.minor() as i32),
883 patch: semver.map(|v| v.patch() as i32),
884 checksum_matched,
885 release_channel: body.release_channel.clone().unwrap_or_default(),
886 os_name: body.os_name.clone(),
887 os_version: body.os_version.clone().unwrap_or_default(),
888 installation_id: body.installation_id.clone(),
889 session_id: body.session_id.clone(),
890 is_staff: body.is_staff,
891 time: time.timestamp_millis(),
892 conversation_id: event.conversation_id.unwrap_or_default(),
893 kind: event.kind.to_string(),
894 model: event.model,
895 response_latency_in_ms: event
896 .response_latency
897 .map(|latency| latency.as_millis() as i64),
898 error_message: event.error_message,
899 }
900 }
901}
902
903#[derive(Debug, clickhouse::Row, Serialize)]
904pub struct CpuEventRow {
905 installation_id: Option<String>,
906 is_staff: Option<bool>,
907 usage_as_percentage: f32,
908 core_count: u32,
909 app_version: String,
910 release_channel: String,
911 os_name: String,
912 os_version: String,
913 time: i64,
914 session_id: Option<String>,
915 // pub normalized_cpu_usage: f64, MATERIALIZED
916 major: Option<i32>,
917 minor: Option<i32>,
918 patch: Option<i32>,
919 checksum_matched: bool,
920}
921
922impl CpuEventRow {
923 fn from_event(
924 event: CpuEvent,
925 wrapper: &EventWrapper,
926 body: &EventRequestBody,
927 first_event_at: chrono::DateTime<chrono::Utc>,
928 checksum_matched: bool,
929 ) -> Self {
930 let semver = body.semver();
931 let time =
932 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
933
934 Self {
935 app_version: body.app_version.clone(),
936 major: semver.map(|v| v.major() as i32),
937 minor: semver.map(|v| v.minor() as i32),
938 patch: semver.map(|v| v.patch() as i32),
939 checksum_matched,
940 release_channel: body.release_channel.clone().unwrap_or_default(),
941 os_name: body.os_name.clone(),
942 os_version: body.os_version.clone().unwrap_or_default(),
943 installation_id: body.installation_id.clone(),
944 session_id: body.session_id.clone(),
945 is_staff: body.is_staff,
946 time: time.timestamp_millis(),
947 usage_as_percentage: event.usage_as_percentage,
948 core_count: event.core_count,
949 }
950 }
951}
952
953#[derive(Serialize, Debug, clickhouse::Row)]
954pub struct MemoryEventRow {
955 // AppInfoBase
956 app_version: String,
957 major: Option<i32>,
958 minor: Option<i32>,
959 patch: Option<i32>,
960 checksum_matched: bool,
961 release_channel: String,
962 os_name: String,
963 os_version: String,
964
965 // ClientEventBase
966 installation_id: Option<String>,
967 session_id: Option<String>,
968 is_staff: Option<bool>,
969 time: i64,
970
971 // MemoryEventRow
972 memory_in_bytes: u64,
973 virtual_memory_in_bytes: u64,
974}
975
976impl MemoryEventRow {
977 fn from_event(
978 event: MemoryEvent,
979 wrapper: &EventWrapper,
980 body: &EventRequestBody,
981 first_event_at: chrono::DateTime<chrono::Utc>,
982 checksum_matched: bool,
983 ) -> Self {
984 let semver = body.semver();
985 let time =
986 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
987
988 Self {
989 app_version: body.app_version.clone(),
990 major: semver.map(|v| v.major() as i32),
991 minor: semver.map(|v| v.minor() as i32),
992 patch: semver.map(|v| v.patch() as i32),
993 checksum_matched,
994 release_channel: body.release_channel.clone().unwrap_or_default(),
995 os_name: body.os_name.clone(),
996 os_version: body.os_version.clone().unwrap_or_default(),
997 installation_id: body.installation_id.clone(),
998 session_id: body.session_id.clone(),
999 is_staff: body.is_staff,
1000 time: time.timestamp_millis(),
1001 memory_in_bytes: event.memory_in_bytes,
1002 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
1003 }
1004 }
1005}
1006
1007#[derive(Serialize, Debug, clickhouse::Row)]
1008pub struct AppEventRow {
1009 // AppInfoBase
1010 app_version: String,
1011 major: Option<i32>,
1012 minor: Option<i32>,
1013 patch: Option<i32>,
1014 checksum_matched: bool,
1015 release_channel: String,
1016 os_name: String,
1017 os_version: String,
1018
1019 // ClientEventBase
1020 installation_id: Option<String>,
1021 session_id: Option<String>,
1022 is_staff: Option<bool>,
1023 time: i64,
1024
1025 // AppEventRow
1026 operation: String,
1027}
1028
1029impl AppEventRow {
1030 fn from_event(
1031 event: AppEvent,
1032 wrapper: &EventWrapper,
1033 body: &EventRequestBody,
1034 first_event_at: chrono::DateTime<chrono::Utc>,
1035 checksum_matched: bool,
1036 ) -> Self {
1037 let semver = body.semver();
1038 let time =
1039 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1040
1041 Self {
1042 app_version: body.app_version.clone(),
1043 major: semver.map(|v| v.major() as i32),
1044 minor: semver.map(|v| v.minor() as i32),
1045 patch: semver.map(|v| v.patch() as i32),
1046 checksum_matched,
1047 release_channel: body.release_channel.clone().unwrap_or_default(),
1048 os_name: body.os_name.clone(),
1049 os_version: body.os_version.clone().unwrap_or_default(),
1050 installation_id: body.installation_id.clone(),
1051 session_id: body.session_id.clone(),
1052 is_staff: body.is_staff,
1053 time: time.timestamp_millis(),
1054 operation: event.operation,
1055 }
1056 }
1057}
1058
1059#[derive(Serialize, Debug, clickhouse::Row)]
1060pub struct SettingEventRow {
1061 // AppInfoBase
1062 app_version: String,
1063 major: Option<i32>,
1064 minor: Option<i32>,
1065 patch: Option<i32>,
1066 checksum_matched: bool,
1067 release_channel: String,
1068 os_name: String,
1069 os_version: String,
1070
1071 // ClientEventBase
1072 installation_id: Option<String>,
1073 session_id: Option<String>,
1074 is_staff: Option<bool>,
1075 time: i64,
1076 // SettingEventRow
1077 setting: String,
1078 value: String,
1079}
1080
1081impl SettingEventRow {
1082 fn from_event(
1083 event: SettingEvent,
1084 wrapper: &EventWrapper,
1085 body: &EventRequestBody,
1086 first_event_at: chrono::DateTime<chrono::Utc>,
1087 checksum_matched: bool,
1088 ) -> Self {
1089 let semver = body.semver();
1090 let time =
1091 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1092
1093 Self {
1094 app_version: body.app_version.clone(),
1095 major: semver.map(|v| v.major() as i32),
1096 minor: semver.map(|v| v.minor() as i32),
1097 checksum_matched,
1098 patch: semver.map(|v| v.patch() as i32),
1099 release_channel: body.release_channel.clone().unwrap_or_default(),
1100 os_name: body.os_name.clone(),
1101 os_version: body.os_version.clone().unwrap_or_default(),
1102 installation_id: body.installation_id.clone(),
1103 session_id: body.session_id.clone(),
1104 is_staff: body.is_staff,
1105 time: time.timestamp_millis(),
1106 setting: event.setting,
1107 value: event.value,
1108 }
1109 }
1110}
1111
1112#[derive(Serialize, Debug, clickhouse::Row)]
1113pub struct ExtensionEventRow {
1114 // AppInfoBase
1115 app_version: String,
1116 major: Option<i32>,
1117 minor: Option<i32>,
1118 patch: Option<i32>,
1119 checksum_matched: bool,
1120 release_channel: String,
1121 os_name: String,
1122 os_version: String,
1123
1124 // ClientEventBase
1125 installation_id: Option<String>,
1126 session_id: Option<String>,
1127 is_staff: Option<bool>,
1128 time: i64,
1129
1130 // ExtensionEventRow
1131 extension_id: Arc<str>,
1132 extension_version: Arc<str>,
1133 dev: bool,
1134 schema_version: Option<i32>,
1135 wasm_api_version: Option<String>,
1136}
1137
1138impl ExtensionEventRow {
1139 fn from_event(
1140 event: ExtensionEvent,
1141 wrapper: &EventWrapper,
1142 body: &EventRequestBody,
1143 extension_metadata: Option<ExtensionMetadata>,
1144 first_event_at: chrono::DateTime<chrono::Utc>,
1145 checksum_matched: bool,
1146 ) -> Self {
1147 let semver = body.semver();
1148 let time =
1149 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1150
1151 Self {
1152 app_version: body.app_version.clone(),
1153 major: semver.map(|v| v.major() as i32),
1154 minor: semver.map(|v| v.minor() as i32),
1155 patch: semver.map(|v| v.patch() as i32),
1156 checksum_matched,
1157 release_channel: body.release_channel.clone().unwrap_or_default(),
1158 os_name: body.os_name.clone(),
1159 os_version: body.os_version.clone().unwrap_or_default(),
1160 installation_id: body.installation_id.clone(),
1161 session_id: body.session_id.clone(),
1162 is_staff: body.is_staff,
1163 time: time.timestamp_millis(),
1164 extension_id: event.extension_id,
1165 extension_version: event.version,
1166 dev: extension_metadata.is_none(),
1167 schema_version: extension_metadata
1168 .as_ref()
1169 .and_then(|metadata| metadata.manifest.schema_version),
1170 wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1171 metadata
1172 .manifest
1173 .wasm_api_version
1174 .as_ref()
1175 .map(|version| version.to_string())
1176 }),
1177 }
1178 }
1179}
1180
1181#[derive(Serialize, Debug, clickhouse::Row)]
1182pub struct ReplEventRow {
1183 // AppInfoBase
1184 app_version: String,
1185 major: Option<i32>,
1186 minor: Option<i32>,
1187 patch: Option<i32>,
1188 checksum_matched: bool,
1189 release_channel: String,
1190 os_name: String,
1191 os_version: String,
1192
1193 // ClientEventBase
1194 installation_id: Option<String>,
1195 session_id: Option<String>,
1196 is_staff: Option<bool>,
1197 time: i64,
1198
1199 // ReplEventRow
1200 kernel_language: String,
1201 kernel_status: String,
1202 repl_session_id: String,
1203}
1204
1205impl ReplEventRow {
1206 fn from_event(
1207 event: ReplEvent,
1208 wrapper: &EventWrapper,
1209 body: &EventRequestBody,
1210 first_event_at: chrono::DateTime<chrono::Utc>,
1211 checksum_matched: bool,
1212 ) -> Self {
1213 let semver = body.semver();
1214 let time =
1215 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1216
1217 Self {
1218 app_version: body.app_version.clone(),
1219 major: semver.map(|v| v.major() as i32),
1220 minor: semver.map(|v| v.minor() as i32),
1221 patch: semver.map(|v| v.patch() as i32),
1222 checksum_matched,
1223 release_channel: body.release_channel.clone().unwrap_or_default(),
1224 os_name: body.os_name.clone(),
1225 os_version: body.os_version.clone().unwrap_or_default(),
1226 installation_id: body.installation_id.clone(),
1227 session_id: body.session_id.clone(),
1228 is_staff: body.is_staff,
1229 time: time.timestamp_millis(),
1230 kernel_language: event.kernel_language,
1231 kernel_status: event.kernel_status,
1232 repl_session_id: event.repl_session_id,
1233 }
1234 }
1235}
1236
1237#[derive(Serialize, Debug, clickhouse::Row)]
1238pub struct EditEventRow {
1239 // AppInfoBase
1240 app_version: String,
1241 major: Option<i32>,
1242 minor: Option<i32>,
1243 patch: Option<i32>,
1244 checksum_matched: bool,
1245 release_channel: String,
1246 os_name: String,
1247 os_version: String,
1248
1249 // ClientEventBase
1250 installation_id: Option<String>,
1251 // Note: This column name has a typo in the ClickHouse table.
1252 #[serde(rename = "sesssion_id")]
1253 session_id: Option<String>,
1254 is_staff: Option<bool>,
1255 time: i64,
1256
1257 // EditEventRow
1258 period_start: i64,
1259 period_end: i64,
1260 environment: String,
1261}
1262
1263impl EditEventRow {
1264 fn from_event(
1265 event: EditEvent,
1266 wrapper: &EventWrapper,
1267 body: &EventRequestBody,
1268 first_event_at: chrono::DateTime<chrono::Utc>,
1269 checksum_matched: bool,
1270 ) -> Self {
1271 let semver = body.semver();
1272 let time =
1273 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1274
1275 let period_start = time - chrono::Duration::milliseconds(event.duration);
1276 let period_end = time;
1277
1278 Self {
1279 app_version: body.app_version.clone(),
1280 major: semver.map(|v| v.major() as i32),
1281 minor: semver.map(|v| v.minor() as i32),
1282 patch: semver.map(|v| v.patch() as i32),
1283 checksum_matched,
1284 release_channel: body.release_channel.clone().unwrap_or_default(),
1285 os_name: body.os_name.clone(),
1286 os_version: body.os_version.clone().unwrap_or_default(),
1287 installation_id: body.installation_id.clone(),
1288 session_id: body.session_id.clone(),
1289 is_staff: body.is_staff,
1290 time: time.timestamp_millis(),
1291 period_start: period_start.timestamp_millis(),
1292 period_end: period_end.timestamp_millis(),
1293 environment: event.environment,
1294 }
1295 }
1296}
1297
1298#[derive(Serialize, Debug, clickhouse::Row)]
1299pub struct ActionEventRow {
1300 // AppInfoBase
1301 app_version: String,
1302 major: Option<i32>,
1303 minor: Option<i32>,
1304 patch: Option<i32>,
1305 checksum_matched: bool,
1306 release_channel: String,
1307 os_name: String,
1308 os_version: String,
1309
1310 // ClientEventBase
1311 installation_id: Option<String>,
1312 // Note: This column name has a typo in the ClickHouse table.
1313 #[serde(rename = "sesssion_id")]
1314 session_id: Option<String>,
1315 is_staff: Option<bool>,
1316 time: i64,
1317 // ActionEventRow
1318 source: String,
1319 action: String,
1320}
1321
1322impl ActionEventRow {
1323 fn from_event(
1324 event: ActionEvent,
1325 wrapper: &EventWrapper,
1326 body: &EventRequestBody,
1327 first_event_at: chrono::DateTime<chrono::Utc>,
1328 checksum_matched: bool,
1329 ) -> Self {
1330 let semver = body.semver();
1331 let time =
1332 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1333
1334 Self {
1335 app_version: body.app_version.clone(),
1336 major: semver.map(|v| v.major() as i32),
1337 minor: semver.map(|v| v.minor() as i32),
1338 patch: semver.map(|v| v.patch() as i32),
1339 checksum_matched,
1340 release_channel: body.release_channel.clone().unwrap_or_default(),
1341 os_name: body.os_name.clone(),
1342 os_version: body.os_version.clone().unwrap_or_default(),
1343 installation_id: body.installation_id.clone(),
1344 session_id: body.session_id.clone(),
1345 is_staff: body.is_staff,
1346 time: time.timestamp_millis(),
1347 source: event.source,
1348 action: event.action,
1349 }
1350 }
1351}
1352
1353pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1354 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
1355 return None;
1356 };
1357
1358 let mut summer = Sha256::new();
1359 summer.update(checksum_seed);
1360 summer.update(&json);
1361 summer.update(checksum_seed);
1362 Some(summer.finalize().into_iter().collect())
1363}