1use super::ips_file::IpsFile;
2use crate::api::CloudflareIpCountryHeader;
3use crate::clickhouse::write_to_table;
4use crate::{api::slack, AppState, Error, Result};
5use anyhow::{anyhow, Context};
6use aws_sdk_s3::primitives::ByteStream;
7use axum::{
8 body::Bytes,
9 headers::Header,
10 http::{HeaderMap, HeaderName, StatusCode},
11 routing::post,
12 Extension, Router, TypedHeader,
13};
14use rpc::ExtensionMetadata;
15use semantic_version::SemanticVersion;
16use serde::{Serialize, Serializer};
17use sha2::{Digest, Sha256};
18use std::sync::{Arc, OnceLock};
19use telemetry_events::{
20 ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
21 EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, ReplEvent,
22 SettingEvent,
23};
24use uuid::Uuid;
25
26static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
27
28pub fn router() -> Router {
29 Router::new()
30 .route("/telemetry/events", post(post_events))
31 .route("/telemetry/crashes", post(post_crash))
32 .route("/telemetry/panics", post(post_panic))
33 .route("/telemetry/hangs", post(post_hang))
34}
35
36pub struct ZedChecksumHeader(Vec<u8>);
37
38impl Header for ZedChecksumHeader {
39 fn name() -> &'static HeaderName {
40 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
41 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
42 }
43
44 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
45 where
46 Self: Sized,
47 I: Iterator<Item = &'i axum::http::HeaderValue>,
48 {
49 let checksum = values
50 .next()
51 .ok_or_else(axum::headers::Error::invalid)?
52 .to_str()
53 .map_err(|_| axum::headers::Error::invalid())?;
54
55 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
56 Ok(Self(bytes))
57 }
58
59 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
60 unimplemented!()
61 }
62}
63
64pub async fn post_crash(
65 Extension(app): Extension<Arc<AppState>>,
66 headers: HeaderMap,
67 body: Bytes,
68) -> Result<()> {
69 let report = IpsFile::parse(&body)?;
70 let version_threshold = SemanticVersion::new(0, 123, 0);
71
72 let bundle_id = &report.header.bundle_id;
73 let app_version = &report.app_version();
74
75 if bundle_id == "dev.zed.Zed-Dev" {
76 log::error!("Crash uploads from {} are ignored.", bundle_id);
77 return Ok(());
78 }
79
80 if app_version.is_none() || app_version.unwrap() < version_threshold {
81 log::error!(
82 "Crash uploads from {} are ignored.",
83 report.header.app_version
84 );
85 return Ok(());
86 }
87 let app_version = app_version.unwrap();
88
89 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
90 let response = blob_store_client
91 .head_object()
92 .bucket(CRASH_REPORTS_BUCKET)
93 .key(report.header.incident_id.clone() + ".ips")
94 .send()
95 .await;
96
97 if response.is_ok() {
98 log::info!("We've already uploaded this crash");
99 return Ok(());
100 }
101
102 blob_store_client
103 .put_object()
104 .bucket(CRASH_REPORTS_BUCKET)
105 .key(report.header.incident_id.clone() + ".ips")
106 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
107 .body(ByteStream::from(body.to_vec()))
108 .send()
109 .await
110 .map_err(|e| log::error!("Failed to upload crash: {}", e))
111 .ok();
112 }
113
114 let recent_panic_on: Option<i64> = headers
115 .get("x-zed-panicked-on")
116 .and_then(|h| h.to_str().ok())
117 .and_then(|s| s.parse().ok());
118
119 let installation_id = headers
120 .get("x-zed-installation-id")
121 .and_then(|h| h.to_str().ok())
122 .map(|s| s.to_string())
123 .unwrap_or_default();
124
125 let mut recent_panic = None;
126
127 if let Some(recent_panic_on) = recent_panic_on {
128 let crashed_at = match report.timestamp() {
129 Ok(t) => Some(t),
130 Err(e) => {
131 log::error!("Can't parse {}: {}", report.header.timestamp, e);
132 None
133 }
134 };
135 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
136 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
137 }
138 }
139
140 let description = report.description(recent_panic);
141 let summary = report.backtrace_summary();
142
143 tracing::error!(
144 service = "client",
145 version = %report.header.app_version,
146 os_version = %report.header.os_version,
147 bundle_id = %report.header.bundle_id,
148 incident_id = %report.header.incident_id,
149 installation_id = %installation_id,
150 description = %description,
151 backtrace = %summary,
152 "crash report"
153 );
154
155 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
156 let payload = slack::WebhookBody::new(|w| {
157 w.add_section(|s| s.text(slack::Text::markdown(description)))
158 .add_section(|s| {
159 s.add_field(slack::Text::markdown(format!(
160 "*Version:*\n{} ({})",
161 bundle_id, app_version
162 )))
163 .add_field({
164 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
165 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
166 hostname.strip_prefix("http://").unwrap_or_default()
167 });
168
169 slack::Text::markdown(format!(
170 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
171 CRASH_REPORTS_BUCKET,
172 hostname,
173 report.header.incident_id,
174 report
175 .header
176 .incident_id
177 .chars()
178 .take(8)
179 .collect::<String>(),
180 ))
181 })
182 })
183 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
184 });
185 let payload_json = serde_json::to_string(&payload).map_err(|err| {
186 log::error!("Failed to serialize payload to JSON: {err}");
187 Error::Internal(anyhow!(err))
188 })?;
189
190 reqwest::Client::new()
191 .post(slack_panics_webhook)
192 .header("Content-Type", "application/json")
193 .body(payload_json)
194 .send()
195 .await
196 .map_err(|err| {
197 log::error!("Failed to send payload to Slack: {err}");
198 Error::Internal(anyhow!(err))
199 })?;
200 }
201
202 Ok(())
203}
204
205pub async fn post_hang(
206 Extension(app): Extension<Arc<AppState>>,
207 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
208 body: Bytes,
209) -> Result<()> {
210 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
211 return Err(Error::http(
212 StatusCode::INTERNAL_SERVER_ERROR,
213 "events not enabled".into(),
214 ))?;
215 };
216
217 if checksum != expected {
218 return Err(Error::http(
219 StatusCode::BAD_REQUEST,
220 "invalid checksum".into(),
221 ))?;
222 }
223
224 let incident_id = Uuid::new_v4().to_string();
225
226 // dump JSON into S3 so we can get frame offsets if we need to.
227 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
228 blob_store_client
229 .put_object()
230 .bucket(CRASH_REPORTS_BUCKET)
231 .key(incident_id.clone() + ".hang.json")
232 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
233 .body(ByteStream::from(body.to_vec()))
234 .send()
235 .await
236 .map_err(|e| log::error!("Failed to upload crash: {}", e))
237 .ok();
238 }
239
240 let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
241 log::error!("can't parse report json: {err}");
242 Error::Internal(anyhow!(err))
243 })?;
244
245 let mut backtrace = "Possible hang detected on main thread:".to_string();
246 let unknown = "<unknown>".to_string();
247 for frame in report.backtrace.iter() {
248 backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
249 }
250
251 tracing::error!(
252 service = "client",
253 version = %report.app_version.unwrap_or_default().to_string(),
254 os_name = %report.os_name,
255 os_version = report.os_version.unwrap_or_default().to_string(),
256 incident_id = %incident_id,
257 installation_id = %report.installation_id.unwrap_or_default(),
258 backtrace = %backtrace,
259 "hang report");
260
261 Ok(())
262}
263
264pub async fn post_panic(
265 Extension(app): Extension<Arc<AppState>>,
266 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
267 body: Bytes,
268) -> Result<()> {
269 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
270 return Err(Error::http(
271 StatusCode::INTERNAL_SERVER_ERROR,
272 "events not enabled".into(),
273 ))?;
274 };
275
276 if checksum != expected {
277 return Err(Error::http(
278 StatusCode::BAD_REQUEST,
279 "invalid checksum".into(),
280 ))?;
281 }
282
283 let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
284 .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
285 let panic = report.panic;
286
287 if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
288 return Err(Error::http(
289 StatusCode::BAD_REQUEST,
290 "invalid os version".into(),
291 ))?;
292 }
293
294 tracing::error!(
295 service = "client",
296 version = %panic.app_version,
297 os_name = %panic.os_name,
298 os_version = %panic.os_version.clone().unwrap_or_default(),
299 installation_id = %panic.installation_id.unwrap_or_default(),
300 description = %panic.payload,
301 backtrace = %panic.backtrace.join("\n"),
302 "panic report");
303
304 let backtrace = if panic.backtrace.len() > 25 {
305 let total = panic.backtrace.len();
306 format!(
307 "{}\n and {} more",
308 panic
309 .backtrace
310 .iter()
311 .take(20)
312 .cloned()
313 .collect::<Vec<_>>()
314 .join("\n"),
315 total - 20
316 )
317 } else {
318 panic.backtrace.join("\n")
319 };
320 let backtrace_with_summary = panic.payload + "\n" + &backtrace;
321
322 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
323 let payload = slack::WebhookBody::new(|w| {
324 w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
325 .add_section(|s| {
326 s.add_field(slack::Text::markdown(format!(
327 "*Version:*\n {} ",
328 panic.app_version
329 )))
330 .add_field({
331 slack::Text::markdown(format!(
332 "*OS:*\n{} {}",
333 panic.os_name,
334 panic.os_version.unwrap_or_default()
335 ))
336 })
337 })
338 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
339 });
340 let payload_json = serde_json::to_string(&payload).map_err(|err| {
341 log::error!("Failed to serialize payload to JSON: {err}");
342 Error::Internal(anyhow!(err))
343 })?;
344
345 reqwest::Client::new()
346 .post(slack_panics_webhook)
347 .header("Content-Type", "application/json")
348 .body(payload_json)
349 .send()
350 .await
351 .map_err(|err| {
352 log::error!("Failed to send payload to Slack: {err}");
353 Error::Internal(anyhow!(err))
354 })?;
355 }
356
357 Ok(())
358}
359
360pub async fn post_events(
361 Extension(app): Extension<Arc<AppState>>,
362 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
363 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
364 body: Bytes,
365) -> Result<()> {
366 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
367 Err(Error::http(
368 StatusCode::NOT_IMPLEMENTED,
369 "not supported".into(),
370 ))?
371 };
372
373 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
374 return Err(Error::http(
375 StatusCode::INTERNAL_SERVER_ERROR,
376 "events not enabled".into(),
377 ))?;
378 };
379
380 let checksum_matched = checksum == expected;
381
382 let request_body: telemetry_events::EventRequestBody =
383 serde_json::from_slice(&body).map_err(|err| {
384 log::error!("can't parse event json: {err}");
385 Error::Internal(anyhow!(err))
386 })?;
387
388 let mut to_upload = ToUpload::default();
389 let Some(last_event) = request_body.events.last() else {
390 return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
391 };
392 let country_code = country_code_header.map(|h| h.to_string());
393
394 let first_event_at = chrono::Utc::now()
395 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
396
397 for wrapper in &request_body.events {
398 match &wrapper.event {
399 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
400 event.clone(),
401 wrapper,
402 &request_body,
403 first_event_at,
404 country_code.clone(),
405 checksum_matched,
406 )),
407 // Needed for clients sending old copilot_event types
408 Event::Copilot(_) => {}
409 Event::InlineCompletion(event) => {
410 to_upload
411 .inline_completion_events
412 .push(InlineCompletionEventRow::from_event(
413 event.clone(),
414 wrapper,
415 &request_body,
416 first_event_at,
417 country_code.clone(),
418 checksum_matched,
419 ))
420 }
421 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
422 event.clone(),
423 wrapper,
424 &request_body,
425 first_event_at,
426 checksum_matched,
427 )),
428 Event::Assistant(event) => {
429 to_upload
430 .assistant_events
431 .push(AssistantEventRow::from_event(
432 event.clone(),
433 wrapper,
434 &request_body,
435 first_event_at,
436 checksum_matched,
437 ))
438 }
439 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
440 event.clone(),
441 wrapper,
442 &request_body,
443 first_event_at,
444 checksum_matched,
445 )),
446 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
447 event.clone(),
448 wrapper,
449 &request_body,
450 first_event_at,
451 checksum_matched,
452 )),
453 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
454 event.clone(),
455 wrapper,
456 &request_body,
457 first_event_at,
458 checksum_matched,
459 )),
460 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
461 event.clone(),
462 wrapper,
463 &request_body,
464 first_event_at,
465 checksum_matched,
466 )),
467 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
468 event.clone(),
469 wrapper,
470 &request_body,
471 first_event_at,
472 checksum_matched,
473 )),
474 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
475 event.clone(),
476 wrapper,
477 &request_body,
478 first_event_at,
479 checksum_matched,
480 )),
481 Event::Extension(event) => {
482 let metadata = app
483 .db
484 .get_extension_version(&event.extension_id, &event.version)
485 .await?;
486 to_upload
487 .extension_events
488 .push(ExtensionEventRow::from_event(
489 event.clone(),
490 wrapper,
491 &request_body,
492 metadata,
493 first_event_at,
494 checksum_matched,
495 ))
496 }
497 Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
498 event.clone(),
499 wrapper,
500 &request_body,
501 first_event_at,
502 checksum_matched,
503 )),
504 }
505 }
506
507 to_upload
508 .upload(&clickhouse_client)
509 .await
510 .map_err(|err| Error::Internal(anyhow!(err)))?;
511
512 Ok(())
513}
514
515#[derive(Default)]
516struct ToUpload {
517 editor_events: Vec<EditorEventRow>,
518 inline_completion_events: Vec<InlineCompletionEventRow>,
519 assistant_events: Vec<AssistantEventRow>,
520 call_events: Vec<CallEventRow>,
521 cpu_events: Vec<CpuEventRow>,
522 memory_events: Vec<MemoryEventRow>,
523 app_events: Vec<AppEventRow>,
524 setting_events: Vec<SettingEventRow>,
525 extension_events: Vec<ExtensionEventRow>,
526 edit_events: Vec<EditEventRow>,
527 action_events: Vec<ActionEventRow>,
528 repl_events: Vec<ReplEventRow>,
529}
530
531impl ToUpload {
532 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
533 const EDITOR_EVENTS_TABLE: &str = "editor_events";
534 write_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
535 .await
536 .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
537
538 const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
539 write_to_table(
540 INLINE_COMPLETION_EVENTS_TABLE,
541 &self.inline_completion_events,
542 clickhouse_client,
543 )
544 .await
545 .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
546
547 const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
548 write_to_table(
549 ASSISTANT_EVENTS_TABLE,
550 &self.assistant_events,
551 clickhouse_client,
552 )
553 .await
554 .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
555
556 const CALL_EVENTS_TABLE: &str = "call_events";
557 write_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
558 .await
559 .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
560
561 const CPU_EVENTS_TABLE: &str = "cpu_events";
562 write_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
563 .await
564 .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
565
566 const MEMORY_EVENTS_TABLE: &str = "memory_events";
567 write_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
568 .await
569 .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
570
571 const APP_EVENTS_TABLE: &str = "app_events";
572 write_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
573 .await
574 .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
575
576 const SETTING_EVENTS_TABLE: &str = "setting_events";
577 write_to_table(
578 SETTING_EVENTS_TABLE,
579 &self.setting_events,
580 clickhouse_client,
581 )
582 .await
583 .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
584
585 const EXTENSION_EVENTS_TABLE: &str = "extension_events";
586 write_to_table(
587 EXTENSION_EVENTS_TABLE,
588 &self.extension_events,
589 clickhouse_client,
590 )
591 .await
592 .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
593
594 const EDIT_EVENTS_TABLE: &str = "edit_events";
595 write_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
596 .await
597 .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
598
599 const ACTION_EVENTS_TABLE: &str = "action_events";
600 write_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
601 .await
602 .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
603
604 const REPL_EVENTS_TABLE: &str = "repl_events";
605 write_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
606 .await
607 .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
608
609 Ok(())
610 }
611}
612
613pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
614where
615 S: Serializer,
616{
617 if country_code.len() != 2 {
618 use serde::ser::Error;
619 return Err(S::Error::custom(
620 "country_code must be exactly 2 characters",
621 ));
622 }
623
624 let country_code = country_code.as_bytes();
625
626 serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
627}
628
629#[derive(Serialize, Debug, clickhouse::Row)]
630pub struct EditorEventRow {
631 system_id: String,
632 installation_id: String,
633 session_id: Option<String>,
634 metrics_id: String,
635 operation: String,
636 app_version: String,
637 file_extension: String,
638 os_name: String,
639 os_version: String,
640 release_channel: String,
641 signed_in: bool,
642 vim_mode: bool,
643 #[serde(serialize_with = "serialize_country_code")]
644 country_code: String,
645 region_code: String,
646 city: String,
647 time: i64,
648 copilot_enabled: bool,
649 copilot_enabled_for_language: bool,
650 historical_event: bool,
651 architecture: String,
652 is_staff: Option<bool>,
653 major: Option<i32>,
654 minor: Option<i32>,
655 patch: Option<i32>,
656 checksum_matched: bool,
657}
658
659impl EditorEventRow {
660 fn from_event(
661 event: EditorEvent,
662 wrapper: &EventWrapper,
663 body: &EventRequestBody,
664 first_event_at: chrono::DateTime<chrono::Utc>,
665 country_code: Option<String>,
666 checksum_matched: bool,
667 ) -> Self {
668 let semver = body.semver();
669 let time =
670 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
671
672 Self {
673 app_version: body.app_version.clone(),
674 major: semver.map(|v| v.major() as i32),
675 minor: semver.map(|v| v.minor() as i32),
676 patch: semver.map(|v| v.patch() as i32),
677 checksum_matched,
678 release_channel: body.release_channel.clone().unwrap_or_default(),
679 os_name: body.os_name.clone(),
680 os_version: body.os_version.clone().unwrap_or_default(),
681 architecture: body.architecture.clone(),
682 system_id: body.system_id.clone().unwrap_or_default(),
683 installation_id: body.installation_id.clone().unwrap_or_default(),
684 session_id: body.session_id.clone(),
685 metrics_id: body.metrics_id.clone().unwrap_or_default(),
686 is_staff: body.is_staff,
687 time: time.timestamp_millis(),
688 operation: event.operation,
689 file_extension: event.file_extension.unwrap_or_default(),
690 signed_in: wrapper.signed_in,
691 vim_mode: event.vim_mode,
692 copilot_enabled: event.copilot_enabled,
693 copilot_enabled_for_language: event.copilot_enabled_for_language,
694 country_code: country_code.unwrap_or("XX".to_string()),
695 region_code: "".to_string(),
696 city: "".to_string(),
697 historical_event: false,
698 }
699 }
700}
701
702#[derive(Serialize, Debug, clickhouse::Row)]
703pub struct InlineCompletionEventRow {
704 installation_id: String,
705 session_id: Option<String>,
706 provider: String,
707 suggestion_accepted: bool,
708 app_version: String,
709 file_extension: String,
710 os_name: String,
711 os_version: String,
712 release_channel: String,
713 signed_in: bool,
714 #[serde(serialize_with = "serialize_country_code")]
715 country_code: String,
716 region_code: String,
717 city: String,
718 time: i64,
719 is_staff: Option<bool>,
720 major: Option<i32>,
721 minor: Option<i32>,
722 patch: Option<i32>,
723 checksum_matched: bool,
724}
725
726impl InlineCompletionEventRow {
727 fn from_event(
728 event: InlineCompletionEvent,
729 wrapper: &EventWrapper,
730 body: &EventRequestBody,
731 first_event_at: chrono::DateTime<chrono::Utc>,
732 country_code: Option<String>,
733 checksum_matched: bool,
734 ) -> Self {
735 let semver = body.semver();
736 let time =
737 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
738
739 Self {
740 app_version: body.app_version.clone(),
741 major: semver.map(|v| v.major() as i32),
742 minor: semver.map(|v| v.minor() as i32),
743 patch: semver.map(|v| v.patch() as i32),
744 checksum_matched,
745 release_channel: body.release_channel.clone().unwrap_or_default(),
746 os_name: body.os_name.clone(),
747 os_version: body.os_version.clone().unwrap_or_default(),
748 installation_id: body.installation_id.clone().unwrap_or_default(),
749 session_id: body.session_id.clone(),
750 is_staff: body.is_staff,
751 time: time.timestamp_millis(),
752 file_extension: event.file_extension.unwrap_or_default(),
753 signed_in: wrapper.signed_in,
754 country_code: country_code.unwrap_or("XX".to_string()),
755 region_code: "".to_string(),
756 city: "".to_string(),
757 provider: event.provider,
758 suggestion_accepted: event.suggestion_accepted,
759 }
760 }
761}
762
763#[derive(Serialize, Debug, clickhouse::Row)]
764pub struct CallEventRow {
765 // AppInfoBase
766 app_version: String,
767 major: Option<i32>,
768 minor: Option<i32>,
769 patch: Option<i32>,
770 release_channel: String,
771 os_name: String,
772 os_version: String,
773 checksum_matched: bool,
774
775 // ClientEventBase
776 installation_id: String,
777 session_id: Option<String>,
778 is_staff: Option<bool>,
779 time: i64,
780
781 // CallEventRow
782 operation: String,
783 room_id: Option<u64>,
784 channel_id: Option<u64>,
785}
786
787impl CallEventRow {
788 fn from_event(
789 event: CallEvent,
790 wrapper: &EventWrapper,
791 body: &EventRequestBody,
792 first_event_at: chrono::DateTime<chrono::Utc>,
793 checksum_matched: bool,
794 ) -> Self {
795 let semver = body.semver();
796 let time =
797 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
798
799 Self {
800 app_version: body.app_version.clone(),
801 major: semver.map(|v| v.major() as i32),
802 minor: semver.map(|v| v.minor() as i32),
803 patch: semver.map(|v| v.patch() as i32),
804 checksum_matched,
805 release_channel: body.release_channel.clone().unwrap_or_default(),
806 os_name: body.os_name.clone(),
807 os_version: body.os_version.clone().unwrap_or_default(),
808 installation_id: body.installation_id.clone().unwrap_or_default(),
809 session_id: body.session_id.clone(),
810 is_staff: body.is_staff,
811 time: time.timestamp_millis(),
812 operation: event.operation,
813 room_id: event.room_id,
814 channel_id: event.channel_id,
815 }
816 }
817}
818
819#[derive(Serialize, Debug, clickhouse::Row)]
820pub struct AssistantEventRow {
821 // AppInfoBase
822 app_version: String,
823 major: Option<i32>,
824 minor: Option<i32>,
825 patch: Option<i32>,
826 checksum_matched: bool,
827 release_channel: String,
828 os_name: String,
829 os_version: String,
830
831 // ClientEventBase
832 installation_id: Option<String>,
833 session_id: Option<String>,
834 is_staff: Option<bool>,
835 time: i64,
836
837 // AssistantEventRow
838 conversation_id: String,
839 kind: String,
840 phase: String,
841 model: String,
842 response_latency_in_ms: Option<i64>,
843 error_message: Option<String>,
844}
845
846impl AssistantEventRow {
847 fn from_event(
848 event: AssistantEvent,
849 wrapper: &EventWrapper,
850 body: &EventRequestBody,
851 first_event_at: chrono::DateTime<chrono::Utc>,
852 checksum_matched: bool,
853 ) -> Self {
854 let semver = body.semver();
855 let time =
856 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
857
858 Self {
859 app_version: body.app_version.clone(),
860 major: semver.map(|v| v.major() as i32),
861 minor: semver.map(|v| v.minor() as i32),
862 patch: semver.map(|v| v.patch() as i32),
863 checksum_matched,
864 release_channel: body.release_channel.clone().unwrap_or_default(),
865 os_name: body.os_name.clone(),
866 os_version: body.os_version.clone().unwrap_or_default(),
867 installation_id: body.installation_id.clone(),
868 session_id: body.session_id.clone(),
869 is_staff: body.is_staff,
870 time: time.timestamp_millis(),
871 conversation_id: event.conversation_id.unwrap_or_default(),
872 kind: event.kind.to_string(),
873 phase: event.phase.to_string(),
874 model: event.model,
875 response_latency_in_ms: event
876 .response_latency
877 .map(|latency| latency.as_millis() as i64),
878 error_message: event.error_message,
879 }
880 }
881}
882
883#[derive(Debug, clickhouse::Row, Serialize)]
884pub struct CpuEventRow {
885 system_id: Option<String>,
886 installation_id: Option<String>,
887 session_id: Option<String>,
888 is_staff: Option<bool>,
889 usage_as_percentage: f32,
890 core_count: u32,
891 app_version: String,
892 release_channel: String,
893 os_name: String,
894 os_version: String,
895 time: i64,
896 // pub normalized_cpu_usage: f64, MATERIALIZED
897 major: Option<i32>,
898 minor: Option<i32>,
899 patch: Option<i32>,
900 checksum_matched: bool,
901}
902
903impl CpuEventRow {
904 fn from_event(
905 event: CpuEvent,
906 wrapper: &EventWrapper,
907 body: &EventRequestBody,
908 first_event_at: chrono::DateTime<chrono::Utc>,
909 checksum_matched: bool,
910 ) -> Self {
911 let semver = body.semver();
912 let time =
913 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
914
915 Self {
916 app_version: body.app_version.clone(),
917 major: semver.map(|v| v.major() as i32),
918 minor: semver.map(|v| v.minor() as i32),
919 patch: semver.map(|v| v.patch() as i32),
920 checksum_matched,
921 release_channel: body.release_channel.clone().unwrap_or_default(),
922 os_name: body.os_name.clone(),
923 os_version: body.os_version.clone().unwrap_or_default(),
924 system_id: body.system_id.clone(),
925 installation_id: body.installation_id.clone(),
926 session_id: body.session_id.clone(),
927 is_staff: body.is_staff,
928 time: time.timestamp_millis(),
929 usage_as_percentage: event.usage_as_percentage,
930 core_count: event.core_count,
931 }
932 }
933}
934
935#[derive(Serialize, Debug, clickhouse::Row)]
936pub struct MemoryEventRow {
937 // AppInfoBase
938 app_version: String,
939 major: Option<i32>,
940 minor: Option<i32>,
941 patch: Option<i32>,
942 checksum_matched: bool,
943 release_channel: String,
944 os_name: String,
945 os_version: String,
946
947 // ClientEventBase
948 system_id: Option<String>,
949 installation_id: Option<String>,
950 session_id: Option<String>,
951 is_staff: Option<bool>,
952 time: i64,
953
954 // MemoryEventRow
955 memory_in_bytes: u64,
956 virtual_memory_in_bytes: u64,
957}
958
959impl MemoryEventRow {
960 fn from_event(
961 event: MemoryEvent,
962 wrapper: &EventWrapper,
963 body: &EventRequestBody,
964 first_event_at: chrono::DateTime<chrono::Utc>,
965 checksum_matched: bool,
966 ) -> Self {
967 let semver = body.semver();
968 let time =
969 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
970
971 Self {
972 app_version: body.app_version.clone(),
973 major: semver.map(|v| v.major() as i32),
974 minor: semver.map(|v| v.minor() as i32),
975 patch: semver.map(|v| v.patch() as i32),
976 checksum_matched,
977 release_channel: body.release_channel.clone().unwrap_or_default(),
978 os_name: body.os_name.clone(),
979 os_version: body.os_version.clone().unwrap_or_default(),
980 system_id: body.system_id.clone(),
981 installation_id: body.installation_id.clone(),
982 session_id: body.session_id.clone(),
983 is_staff: body.is_staff,
984 time: time.timestamp_millis(),
985 memory_in_bytes: event.memory_in_bytes,
986 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
987 }
988 }
989}
990
991#[derive(Serialize, Debug, clickhouse::Row)]
992pub struct AppEventRow {
993 // AppInfoBase
994 app_version: String,
995 major: Option<i32>,
996 minor: Option<i32>,
997 patch: Option<i32>,
998 checksum_matched: bool,
999 release_channel: String,
1000 os_name: String,
1001 os_version: String,
1002
1003 // ClientEventBase
1004 system_id: Option<String>,
1005 installation_id: Option<String>,
1006 session_id: Option<String>,
1007 is_staff: Option<bool>,
1008 time: i64,
1009
1010 // AppEventRow
1011 operation: String,
1012}
1013
1014impl AppEventRow {
1015 fn from_event(
1016 event: AppEvent,
1017 wrapper: &EventWrapper,
1018 body: &EventRequestBody,
1019 first_event_at: chrono::DateTime<chrono::Utc>,
1020 checksum_matched: bool,
1021 ) -> Self {
1022 let semver = body.semver();
1023 let time =
1024 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1025
1026 Self {
1027 app_version: body.app_version.clone(),
1028 major: semver.map(|v| v.major() as i32),
1029 minor: semver.map(|v| v.minor() as i32),
1030 patch: semver.map(|v| v.patch() as i32),
1031 checksum_matched,
1032 release_channel: body.release_channel.clone().unwrap_or_default(),
1033 os_name: body.os_name.clone(),
1034 os_version: body.os_version.clone().unwrap_or_default(),
1035 system_id: body.system_id.clone(),
1036 installation_id: body.installation_id.clone(),
1037 session_id: body.session_id.clone(),
1038 is_staff: body.is_staff,
1039 time: time.timestamp_millis(),
1040 operation: event.operation,
1041 }
1042 }
1043}
1044
1045#[derive(Serialize, Debug, clickhouse::Row)]
1046pub struct SettingEventRow {
1047 // AppInfoBase
1048 app_version: String,
1049 major: Option<i32>,
1050 minor: Option<i32>,
1051 patch: Option<i32>,
1052 checksum_matched: bool,
1053 release_channel: String,
1054 os_name: String,
1055 os_version: String,
1056
1057 // ClientEventBase
1058 system_id: Option<String>,
1059 installation_id: Option<String>,
1060 session_id: Option<String>,
1061 is_staff: Option<bool>,
1062 time: i64,
1063 // SettingEventRow
1064 setting: String,
1065 value: String,
1066}
1067
1068impl SettingEventRow {
1069 fn from_event(
1070 event: SettingEvent,
1071 wrapper: &EventWrapper,
1072 body: &EventRequestBody,
1073 first_event_at: chrono::DateTime<chrono::Utc>,
1074 checksum_matched: bool,
1075 ) -> Self {
1076 let semver = body.semver();
1077 let time =
1078 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1079
1080 Self {
1081 app_version: body.app_version.clone(),
1082 major: semver.map(|v| v.major() as i32),
1083 minor: semver.map(|v| v.minor() as i32),
1084 checksum_matched,
1085 patch: semver.map(|v| v.patch() as i32),
1086 release_channel: body.release_channel.clone().unwrap_or_default(),
1087 os_name: body.os_name.clone(),
1088 os_version: body.os_version.clone().unwrap_or_default(),
1089 system_id: body.system_id.clone(),
1090 installation_id: body.installation_id.clone(),
1091 session_id: body.session_id.clone(),
1092 is_staff: body.is_staff,
1093 time: time.timestamp_millis(),
1094 setting: event.setting,
1095 value: event.value,
1096 }
1097 }
1098}
1099
1100#[derive(Serialize, Debug, clickhouse::Row)]
1101pub struct ExtensionEventRow {
1102 // AppInfoBase
1103 app_version: String,
1104 major: Option<i32>,
1105 minor: Option<i32>,
1106 patch: Option<i32>,
1107 checksum_matched: bool,
1108 release_channel: String,
1109 os_name: String,
1110 os_version: String,
1111
1112 // ClientEventBase
1113 system_id: Option<String>,
1114 installation_id: Option<String>,
1115 session_id: Option<String>,
1116 is_staff: Option<bool>,
1117 time: i64,
1118
1119 // ExtensionEventRow
1120 extension_id: Arc<str>,
1121 extension_version: Arc<str>,
1122 dev: bool,
1123 schema_version: Option<i32>,
1124 wasm_api_version: Option<String>,
1125}
1126
1127impl ExtensionEventRow {
1128 fn from_event(
1129 event: ExtensionEvent,
1130 wrapper: &EventWrapper,
1131 body: &EventRequestBody,
1132 extension_metadata: Option<ExtensionMetadata>,
1133 first_event_at: chrono::DateTime<chrono::Utc>,
1134 checksum_matched: bool,
1135 ) -> Self {
1136 let semver = body.semver();
1137 let time =
1138 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1139
1140 Self {
1141 app_version: body.app_version.clone(),
1142 major: semver.map(|v| v.major() as i32),
1143 minor: semver.map(|v| v.minor() as i32),
1144 patch: semver.map(|v| v.patch() as i32),
1145 checksum_matched,
1146 release_channel: body.release_channel.clone().unwrap_or_default(),
1147 os_name: body.os_name.clone(),
1148 os_version: body.os_version.clone().unwrap_or_default(),
1149 system_id: body.system_id.clone(),
1150 installation_id: body.installation_id.clone(),
1151 session_id: body.session_id.clone(),
1152 is_staff: body.is_staff,
1153 time: time.timestamp_millis(),
1154 extension_id: event.extension_id,
1155 extension_version: event.version,
1156 dev: extension_metadata.is_none(),
1157 schema_version: extension_metadata
1158 .as_ref()
1159 .and_then(|metadata| metadata.manifest.schema_version),
1160 wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1161 metadata
1162 .manifest
1163 .wasm_api_version
1164 .as_ref()
1165 .map(|version| version.to_string())
1166 }),
1167 }
1168 }
1169}
1170
1171#[derive(Serialize, Debug, clickhouse::Row)]
1172pub struct ReplEventRow {
1173 // AppInfoBase
1174 app_version: String,
1175 major: Option<i32>,
1176 minor: Option<i32>,
1177 patch: Option<i32>,
1178 checksum_matched: bool,
1179 release_channel: String,
1180 os_name: String,
1181 os_version: String,
1182
1183 // ClientEventBase
1184 installation_id: Option<String>,
1185 session_id: Option<String>,
1186 is_staff: Option<bool>,
1187 time: i64,
1188
1189 // ReplEventRow
1190 kernel_language: String,
1191 kernel_status: String,
1192 repl_session_id: String,
1193}
1194
1195impl ReplEventRow {
1196 fn from_event(
1197 event: ReplEvent,
1198 wrapper: &EventWrapper,
1199 body: &EventRequestBody,
1200 first_event_at: chrono::DateTime<chrono::Utc>,
1201 checksum_matched: bool,
1202 ) -> Self {
1203 let semver = body.semver();
1204 let time =
1205 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1206
1207 Self {
1208 app_version: body.app_version.clone(),
1209 major: semver.map(|v| v.major() as i32),
1210 minor: semver.map(|v| v.minor() as i32),
1211 patch: semver.map(|v| v.patch() as i32),
1212 checksum_matched,
1213 release_channel: body.release_channel.clone().unwrap_or_default(),
1214 os_name: body.os_name.clone(),
1215 os_version: body.os_version.clone().unwrap_or_default(),
1216 installation_id: body.installation_id.clone(),
1217 session_id: body.session_id.clone(),
1218 is_staff: body.is_staff,
1219 time: time.timestamp_millis(),
1220 kernel_language: event.kernel_language,
1221 kernel_status: event.kernel_status,
1222 repl_session_id: event.repl_session_id,
1223 }
1224 }
1225}
1226
1227#[derive(Serialize, Debug, clickhouse::Row)]
1228pub struct EditEventRow {
1229 // AppInfoBase
1230 app_version: String,
1231 major: Option<i32>,
1232 minor: Option<i32>,
1233 patch: Option<i32>,
1234 checksum_matched: bool,
1235 release_channel: String,
1236 os_name: String,
1237 os_version: String,
1238
1239 // ClientEventBase
1240 system_id: Option<String>,
1241 installation_id: Option<String>,
1242 // Note: This column name has a typo in the ClickHouse table.
1243 #[serde(rename = "sesssion_id")]
1244 session_id: Option<String>,
1245 is_staff: Option<bool>,
1246 time: i64,
1247
1248 // EditEventRow
1249 period_start: i64,
1250 period_end: i64,
1251 environment: String,
1252}
1253
1254impl EditEventRow {
1255 fn from_event(
1256 event: EditEvent,
1257 wrapper: &EventWrapper,
1258 body: &EventRequestBody,
1259 first_event_at: chrono::DateTime<chrono::Utc>,
1260 checksum_matched: bool,
1261 ) -> Self {
1262 let semver = body.semver();
1263 let time =
1264 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1265
1266 let period_start = time - chrono::Duration::milliseconds(event.duration);
1267 let period_end = time;
1268
1269 Self {
1270 app_version: body.app_version.clone(),
1271 major: semver.map(|v| v.major() as i32),
1272 minor: semver.map(|v| v.minor() as i32),
1273 patch: semver.map(|v| v.patch() as i32),
1274 checksum_matched,
1275 release_channel: body.release_channel.clone().unwrap_or_default(),
1276 os_name: body.os_name.clone(),
1277 os_version: body.os_version.clone().unwrap_or_default(),
1278 system_id: body.system_id.clone(),
1279 installation_id: body.installation_id.clone(),
1280 session_id: body.session_id.clone(),
1281 is_staff: body.is_staff,
1282 time: time.timestamp_millis(),
1283 period_start: period_start.timestamp_millis(),
1284 period_end: period_end.timestamp_millis(),
1285 environment: event.environment,
1286 }
1287 }
1288}
1289
1290#[derive(Serialize, Debug, clickhouse::Row)]
1291pub struct ActionEventRow {
1292 // AppInfoBase
1293 app_version: String,
1294 major: Option<i32>,
1295 minor: Option<i32>,
1296 patch: Option<i32>,
1297 checksum_matched: bool,
1298 release_channel: String,
1299 os_name: String,
1300 os_version: String,
1301
1302 // ClientEventBase
1303 installation_id: Option<String>,
1304 // Note: This column name has a typo in the ClickHouse table.
1305 #[serde(rename = "sesssion_id")]
1306 session_id: Option<String>,
1307 is_staff: Option<bool>,
1308 time: i64,
1309 // ActionEventRow
1310 source: String,
1311 action: String,
1312}
1313
1314impl ActionEventRow {
1315 fn from_event(
1316 event: ActionEvent,
1317 wrapper: &EventWrapper,
1318 body: &EventRequestBody,
1319 first_event_at: chrono::DateTime<chrono::Utc>,
1320 checksum_matched: bool,
1321 ) -> Self {
1322 let semver = body.semver();
1323 let time =
1324 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1325
1326 Self {
1327 app_version: body.app_version.clone(),
1328 major: semver.map(|v| v.major() as i32),
1329 minor: semver.map(|v| v.minor() as i32),
1330 patch: semver.map(|v| v.patch() as i32),
1331 checksum_matched,
1332 release_channel: body.release_channel.clone().unwrap_or_default(),
1333 os_name: body.os_name.clone(),
1334 os_version: body.os_version.clone().unwrap_or_default(),
1335 installation_id: body.installation_id.clone(),
1336 session_id: body.session_id.clone(),
1337 is_staff: body.is_staff,
1338 time: time.timestamp_millis(),
1339 source: event.source,
1340 action: event.action,
1341 }
1342 }
1343}
1344
1345pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1346 let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
1347
1348 let mut summer = Sha256::new();
1349 summer.update(checksum_seed);
1350 summer.update(json);
1351 summer.update(checksum_seed);
1352 Some(summer.finalize().into_iter().collect())
1353}