1use super::ips_file::IpsFile;
2use crate::{api::slack, AppState, Error, Result};
3use anyhow::{anyhow, Context};
4use aws_sdk_s3::primitives::ByteStream;
5use axum::{
6 body::Bytes,
7 headers::Header,
8 http::{HeaderMap, HeaderName, StatusCode},
9 routing::post,
10 Extension, Router, TypedHeader,
11};
12use rpc::ExtensionMetadata;
13use semantic_version::SemanticVersion;
14use serde::{Serialize, Serializer};
15use sha2::{Digest, Sha256};
16use std::sync::{Arc, OnceLock};
17use telemetry_events::{
18 ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
19 EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, ReplEvent,
20 SettingEvent,
21};
22use uuid::Uuid;
23
24static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
25
26pub fn router() -> Router {
27 Router::new()
28 .route("/telemetry/events", post(post_events))
29 .route("/telemetry/crashes", post(post_crash))
30 .route("/telemetry/panics", post(post_panic))
31 .route("/telemetry/hangs", post(post_hang))
32}
33
34pub struct ZedChecksumHeader(Vec<u8>);
35
36impl Header for ZedChecksumHeader {
37 fn name() -> &'static HeaderName {
38 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
39 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
40 }
41
42 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
43 where
44 Self: Sized,
45 I: Iterator<Item = &'i axum::http::HeaderValue>,
46 {
47 let checksum = values
48 .next()
49 .ok_or_else(axum::headers::Error::invalid)?
50 .to_str()
51 .map_err(|_| axum::headers::Error::invalid())?;
52
53 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
54 Ok(Self(bytes))
55 }
56
57 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
58 unimplemented!()
59 }
60}
61
62pub struct CloudflareIpCountryHeader(String);
63
64impl Header for CloudflareIpCountryHeader {
65 fn name() -> &'static HeaderName {
66 static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
67 CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
68 }
69
70 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
71 where
72 Self: Sized,
73 I: Iterator<Item = &'i axum::http::HeaderValue>,
74 {
75 let country_code = values
76 .next()
77 .ok_or_else(axum::headers::Error::invalid)?
78 .to_str()
79 .map_err(|_| axum::headers::Error::invalid())?;
80
81 Ok(Self(country_code.to_string()))
82 }
83
84 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
85 unimplemented!()
86 }
87}
88
89pub async fn post_crash(
90 Extension(app): Extension<Arc<AppState>>,
91 headers: HeaderMap,
92 body: Bytes,
93) -> Result<()> {
94 let report = IpsFile::parse(&body)?;
95 let version_threshold = SemanticVersion::new(0, 123, 0);
96
97 let bundle_id = &report.header.bundle_id;
98 let app_version = &report.app_version();
99
100 if bundle_id == "dev.zed.Zed-Dev" {
101 log::error!("Crash uploads from {} are ignored.", bundle_id);
102 return Ok(());
103 }
104
105 if app_version.is_none() || app_version.unwrap() < version_threshold {
106 log::error!(
107 "Crash uploads from {} are ignored.",
108 report.header.app_version
109 );
110 return Ok(());
111 }
112 let app_version = app_version.unwrap();
113
114 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
115 let response = blob_store_client
116 .head_object()
117 .bucket(CRASH_REPORTS_BUCKET)
118 .key(report.header.incident_id.clone() + ".ips")
119 .send()
120 .await;
121
122 if response.is_ok() {
123 log::info!("We've already uploaded this crash");
124 return Ok(());
125 }
126
127 blob_store_client
128 .put_object()
129 .bucket(CRASH_REPORTS_BUCKET)
130 .key(report.header.incident_id.clone() + ".ips")
131 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
132 .body(ByteStream::from(body.to_vec()))
133 .send()
134 .await
135 .map_err(|e| log::error!("Failed to upload crash: {}", e))
136 .ok();
137 }
138
139 let recent_panic_on: Option<i64> = headers
140 .get("x-zed-panicked-on")
141 .and_then(|h| h.to_str().ok())
142 .and_then(|s| s.parse().ok());
143
144 let installation_id = headers
145 .get("x-zed-installation-id")
146 .and_then(|h| h.to_str().ok())
147 .map(|s| s.to_string())
148 .unwrap_or_default();
149
150 let mut recent_panic = None;
151
152 if let Some(recent_panic_on) = recent_panic_on {
153 let crashed_at = match report.timestamp() {
154 Ok(t) => Some(t),
155 Err(e) => {
156 log::error!("Can't parse {}: {}", report.header.timestamp, e);
157 None
158 }
159 };
160 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
161 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
162 }
163 }
164
165 let description = report.description(recent_panic);
166 let summary = report.backtrace_summary();
167
168 tracing::error!(
169 service = "client",
170 version = %report.header.app_version,
171 os_version = %report.header.os_version,
172 bundle_id = %report.header.bundle_id,
173 incident_id = %report.header.incident_id,
174 installation_id = %installation_id,
175 description = %description,
176 backtrace = %summary,
177 "crash report");
178
179 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
180 let payload = slack::WebhookBody::new(|w| {
181 w.add_section(|s| s.text(slack::Text::markdown(description)))
182 .add_section(|s| {
183 s.add_field(slack::Text::markdown(format!(
184 "*Version:*\n{} ({})",
185 bundle_id, app_version
186 )))
187 .add_field({
188 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
189 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
190 hostname.strip_prefix("http://").unwrap_or_default()
191 });
192
193 slack::Text::markdown(format!(
194 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
195 CRASH_REPORTS_BUCKET,
196 hostname,
197 report.header.incident_id,
198 report
199 .header
200 .incident_id
201 .chars()
202 .take(8)
203 .collect::<String>(),
204 ))
205 })
206 })
207 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
208 });
209 let payload_json = serde_json::to_string(&payload).map_err(|err| {
210 log::error!("Failed to serialize payload to JSON: {err}");
211 Error::Internal(anyhow!(err))
212 })?;
213
214 reqwest::Client::new()
215 .post(slack_panics_webhook)
216 .header("Content-Type", "application/json")
217 .body(payload_json)
218 .send()
219 .await
220 .map_err(|err| {
221 log::error!("Failed to send payload to Slack: {err}");
222 Error::Internal(anyhow!(err))
223 })?;
224 }
225
226 Ok(())
227}
228
229pub async fn post_hang(
230 Extension(app): Extension<Arc<AppState>>,
231 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
232 body: Bytes,
233) -> Result<()> {
234 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
235 return Err(Error::Http(
236 StatusCode::INTERNAL_SERVER_ERROR,
237 "events not enabled".into(),
238 ))?;
239 };
240
241 if checksum != expected {
242 return Err(Error::Http(
243 StatusCode::BAD_REQUEST,
244 "invalid checksum".into(),
245 ))?;
246 }
247
248 let incident_id = Uuid::new_v4().to_string();
249
250 // dump JSON into S3 so we can get frame offsets if we need to.
251 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
252 blob_store_client
253 .put_object()
254 .bucket(CRASH_REPORTS_BUCKET)
255 .key(incident_id.clone() + ".hang.json")
256 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
257 .body(ByteStream::from(body.to_vec()))
258 .send()
259 .await
260 .map_err(|e| log::error!("Failed to upload crash: {}", e))
261 .ok();
262 }
263
264 let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
265 log::error!("can't parse report json: {err}");
266 Error::Internal(anyhow!(err))
267 })?;
268
269 let mut backtrace = "Possible hang detected on main thread:".to_string();
270 let unknown = "<unknown>".to_string();
271 for frame in report.backtrace.iter() {
272 backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
273 }
274
275 tracing::error!(
276 service = "client",
277 version = %report.app_version.unwrap_or_default().to_string(),
278 os_name = %report.os_name,
279 os_version = report.os_version.unwrap_or_default().to_string(),
280 incident_id = %incident_id,
281 installation_id = %report.installation_id.unwrap_or_default(),
282 backtrace = %backtrace,
283 "hang report");
284
285 Ok(())
286}
287
288pub async fn post_panic(
289 Extension(app): Extension<Arc<AppState>>,
290 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
291 body: Bytes,
292) -> Result<()> {
293 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
294 return Err(Error::Http(
295 StatusCode::INTERNAL_SERVER_ERROR,
296 "events not enabled".into(),
297 ))?;
298 };
299
300 if checksum != expected {
301 return Err(Error::Http(
302 StatusCode::BAD_REQUEST,
303 "invalid checksum".into(),
304 ))?;
305 }
306
307 let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
308 .map_err(|_| Error::Http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
309 let panic = report.panic;
310
311 if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
312 return Err(Error::Http(
313 StatusCode::BAD_REQUEST,
314 "invalid os version".into(),
315 ))?;
316 }
317
318 tracing::error!(
319 service = "client",
320 version = %panic.app_version,
321 os_name = %panic.os_name,
322 os_version = %panic.os_version.clone().unwrap_or_default(),
323 installation_id = %panic.installation_id.unwrap_or_default(),
324 description = %panic.payload,
325 backtrace = %panic.backtrace.join("\n"),
326 "panic report");
327
328 let backtrace = if panic.backtrace.len() > 25 {
329 let total = panic.backtrace.len();
330 format!(
331 "{}\n and {} more",
332 panic
333 .backtrace
334 .iter()
335 .take(20)
336 .cloned()
337 .collect::<Vec<_>>()
338 .join("\n"),
339 total - 20
340 )
341 } else {
342 panic.backtrace.join("\n")
343 };
344 let backtrace_with_summary = panic.payload + "\n" + &backtrace;
345
346 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
347 let payload = slack::WebhookBody::new(|w| {
348 w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
349 .add_section(|s| {
350 s.add_field(slack::Text::markdown(format!(
351 "*Version:*\n {} ",
352 panic.app_version
353 )))
354 .add_field({
355 slack::Text::markdown(format!(
356 "*OS:*\n{} {}",
357 panic.os_name,
358 panic.os_version.unwrap_or_default()
359 ))
360 })
361 })
362 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
363 });
364 let payload_json = serde_json::to_string(&payload).map_err(|err| {
365 log::error!("Failed to serialize payload to JSON: {err}");
366 Error::Internal(anyhow!(err))
367 })?;
368
369 reqwest::Client::new()
370 .post(slack_panics_webhook)
371 .header("Content-Type", "application/json")
372 .body(payload_json)
373 .send()
374 .await
375 .map_err(|err| {
376 log::error!("Failed to send payload to Slack: {err}");
377 Error::Internal(anyhow!(err))
378 })?;
379 }
380
381 Ok(())
382}
383
384pub async fn post_events(
385 Extension(app): Extension<Arc<AppState>>,
386 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
387 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
388 body: Bytes,
389) -> Result<()> {
390 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
391 Err(Error::Http(
392 StatusCode::NOT_IMPLEMENTED,
393 "not supported".into(),
394 ))?
395 };
396
397 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
398 return Err(Error::Http(
399 StatusCode::INTERNAL_SERVER_ERROR,
400 "events not enabled".into(),
401 ))?;
402 };
403
404 let checksum_matched = checksum == expected;
405
406 let request_body: telemetry_events::EventRequestBody =
407 serde_json::from_slice(&body).map_err(|err| {
408 log::error!("can't parse event json: {err}");
409 Error::Internal(anyhow!(err))
410 })?;
411
412 let mut to_upload = ToUpload::default();
413 let Some(last_event) = request_body.events.last() else {
414 return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
415 };
416 let country_code = country_code_header.map(|h| h.0 .0);
417
418 let first_event_at = chrono::Utc::now()
419 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
420
421 for wrapper in &request_body.events {
422 match &wrapper.event {
423 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
424 event.clone(),
425 &wrapper,
426 &request_body,
427 first_event_at,
428 country_code.clone(),
429 checksum_matched,
430 )),
431 // Needed for clients sending old copilot_event types
432 Event::Copilot(_) => {}
433 Event::InlineCompletion(event) => {
434 to_upload
435 .inline_completion_events
436 .push(InlineCompletionEventRow::from_event(
437 event.clone(),
438 &wrapper,
439 &request_body,
440 first_event_at,
441 country_code.clone(),
442 checksum_matched,
443 ))
444 }
445 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
446 event.clone(),
447 &wrapper,
448 &request_body,
449 first_event_at,
450 checksum_matched,
451 )),
452 Event::Assistant(event) => {
453 to_upload
454 .assistant_events
455 .push(AssistantEventRow::from_event(
456 event.clone(),
457 &wrapper,
458 &request_body,
459 first_event_at,
460 checksum_matched,
461 ))
462 }
463 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
464 event.clone(),
465 &wrapper,
466 &request_body,
467 first_event_at,
468 checksum_matched,
469 )),
470 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
471 event.clone(),
472 &wrapper,
473 &request_body,
474 first_event_at,
475 checksum_matched,
476 )),
477 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
478 event.clone(),
479 &wrapper,
480 &request_body,
481 first_event_at,
482 checksum_matched,
483 )),
484 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
485 event.clone(),
486 &wrapper,
487 &request_body,
488 first_event_at,
489 checksum_matched,
490 )),
491 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
492 event.clone(),
493 &wrapper,
494 &request_body,
495 first_event_at,
496 checksum_matched,
497 )),
498 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
499 event.clone(),
500 &wrapper,
501 &request_body,
502 first_event_at,
503 checksum_matched,
504 )),
505 Event::Extension(event) => {
506 let metadata = app
507 .db
508 .get_extension_version(&event.extension_id, &event.version)
509 .await?;
510 to_upload
511 .extension_events
512 .push(ExtensionEventRow::from_event(
513 event.clone(),
514 &wrapper,
515 &request_body,
516 metadata,
517 first_event_at,
518 checksum_matched,
519 ))
520 }
521 Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
522 event.clone(),
523 &wrapper,
524 &request_body,
525 first_event_at,
526 checksum_matched,
527 )),
528 }
529 }
530
531 to_upload
532 .upload(&clickhouse_client)
533 .await
534 .map_err(|err| Error::Internal(anyhow!(err)))?;
535
536 Ok(())
537}
538
539#[derive(Default)]
540struct ToUpload {
541 editor_events: Vec<EditorEventRow>,
542 inline_completion_events: Vec<InlineCompletionEventRow>,
543 assistant_events: Vec<AssistantEventRow>,
544 call_events: Vec<CallEventRow>,
545 cpu_events: Vec<CpuEventRow>,
546 memory_events: Vec<MemoryEventRow>,
547 app_events: Vec<AppEventRow>,
548 setting_events: Vec<SettingEventRow>,
549 extension_events: Vec<ExtensionEventRow>,
550 edit_events: Vec<EditEventRow>,
551 action_events: Vec<ActionEventRow>,
552 repl_events: Vec<ReplEventRow>,
553}
554
555impl ToUpload {
556 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
557 const EDITOR_EVENTS_TABLE: &str = "editor_events";
558 Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
559 .await
560 .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
561
562 const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
563 Self::upload_to_table(
564 INLINE_COMPLETION_EVENTS_TABLE,
565 &self.inline_completion_events,
566 clickhouse_client,
567 )
568 .await
569 .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
570
571 const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
572 Self::upload_to_table(
573 ASSISTANT_EVENTS_TABLE,
574 &self.assistant_events,
575 clickhouse_client,
576 )
577 .await
578 .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
579
580 const CALL_EVENTS_TABLE: &str = "call_events";
581 Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
582 .await
583 .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
584
585 const CPU_EVENTS_TABLE: &str = "cpu_events";
586 Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
587 .await
588 .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
589
590 const MEMORY_EVENTS_TABLE: &str = "memory_events";
591 Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
592 .await
593 .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
594
595 const APP_EVENTS_TABLE: &str = "app_events";
596 Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
597 .await
598 .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
599
600 const SETTING_EVENTS_TABLE: &str = "setting_events";
601 Self::upload_to_table(
602 SETTING_EVENTS_TABLE,
603 &self.setting_events,
604 clickhouse_client,
605 )
606 .await
607 .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
608
609 const EXTENSION_EVENTS_TABLE: &str = "extension_events";
610 Self::upload_to_table(
611 EXTENSION_EVENTS_TABLE,
612 &self.extension_events,
613 clickhouse_client,
614 )
615 .await
616 .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
617
618 const EDIT_EVENTS_TABLE: &str = "edit_events";
619 Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
620 .await
621 .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
622
623 const ACTION_EVENTS_TABLE: &str = "action_events";
624 Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
625 .await
626 .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
627
628 const REPL_EVENTS_TABLE: &str = "repl_events";
629 Self::upload_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
630 .await
631 .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
632
633 Ok(())
634 }
635
636 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
637 table: &str,
638 rows: &[T],
639 clickhouse_client: &clickhouse::Client,
640 ) -> anyhow::Result<()> {
641 if rows.is_empty() {
642 return Ok(());
643 }
644
645 let mut insert = clickhouse_client.insert(table)?;
646
647 for event in rows {
648 insert.write(event).await?;
649 }
650
651 insert.end().await?;
652
653 let event_count = rows.len();
654 log::info!(
655 "wrote {event_count} {event_specifier} to '{table}'",
656 event_specifier = if event_count == 1 { "event" } else { "events" }
657 );
658
659 Ok(())
660 }
661}
662
663pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
664where
665 S: Serializer,
666{
667 if country_code.len() != 2 {
668 use serde::ser::Error;
669 return Err(S::Error::custom(
670 "country_code must be exactly 2 characters",
671 ));
672 }
673
674 let country_code = country_code.as_bytes();
675
676 serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
677}
678
679#[derive(Serialize, Debug, clickhouse::Row)]
680pub struct EditorEventRow {
681 installation_id: String,
682 metrics_id: String,
683 operation: String,
684 app_version: String,
685 file_extension: String,
686 os_name: String,
687 os_version: String,
688 release_channel: String,
689 signed_in: bool,
690 vim_mode: bool,
691 #[serde(serialize_with = "serialize_country_code")]
692 country_code: String,
693 region_code: String,
694 city: String,
695 time: i64,
696 copilot_enabled: bool,
697 copilot_enabled_for_language: bool,
698 historical_event: bool,
699 architecture: String,
700 is_staff: Option<bool>,
701 session_id: Option<String>,
702 major: Option<i32>,
703 minor: Option<i32>,
704 patch: Option<i32>,
705 checksum_matched: bool,
706}
707
708impl EditorEventRow {
709 fn from_event(
710 event: EditorEvent,
711 wrapper: &EventWrapper,
712 body: &EventRequestBody,
713 first_event_at: chrono::DateTime<chrono::Utc>,
714 country_code: Option<String>,
715 checksum_matched: bool,
716 ) -> Self {
717 let semver = body.semver();
718 let time =
719 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
720
721 Self {
722 app_version: body.app_version.clone(),
723 major: semver.map(|v| v.major() as i32),
724 minor: semver.map(|v| v.minor() as i32),
725 patch: semver.map(|v| v.patch() as i32),
726 checksum_matched,
727 release_channel: body.release_channel.clone().unwrap_or_default(),
728 os_name: body.os_name.clone(),
729 os_version: body.os_version.clone().unwrap_or_default(),
730 architecture: body.architecture.clone(),
731 installation_id: body.installation_id.clone().unwrap_or_default(),
732 metrics_id: body.metrics_id.clone().unwrap_or_default(),
733 session_id: body.session_id.clone(),
734 is_staff: body.is_staff,
735 time: time.timestamp_millis(),
736 operation: event.operation,
737 file_extension: event.file_extension.unwrap_or_default(),
738 signed_in: wrapper.signed_in,
739 vim_mode: event.vim_mode,
740 copilot_enabled: event.copilot_enabled,
741 copilot_enabled_for_language: event.copilot_enabled_for_language,
742 country_code: country_code.unwrap_or("XX".to_string()),
743 region_code: "".to_string(),
744 city: "".to_string(),
745 historical_event: false,
746 }
747 }
748}
749
750#[derive(Serialize, Debug, clickhouse::Row)]
751pub struct InlineCompletionEventRow {
752 installation_id: String,
753 provider: String,
754 suggestion_accepted: bool,
755 app_version: String,
756 file_extension: String,
757 os_name: String,
758 os_version: String,
759 release_channel: String,
760 signed_in: bool,
761 #[serde(serialize_with = "serialize_country_code")]
762 country_code: String,
763 region_code: String,
764 city: String,
765 time: i64,
766 is_staff: Option<bool>,
767 session_id: Option<String>,
768 major: Option<i32>,
769 minor: Option<i32>,
770 patch: Option<i32>,
771 checksum_matched: bool,
772}
773
774impl InlineCompletionEventRow {
775 fn from_event(
776 event: InlineCompletionEvent,
777 wrapper: &EventWrapper,
778 body: &EventRequestBody,
779 first_event_at: chrono::DateTime<chrono::Utc>,
780 country_code: Option<String>,
781 checksum_matched: bool,
782 ) -> Self {
783 let semver = body.semver();
784 let time =
785 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
786
787 Self {
788 app_version: body.app_version.clone(),
789 major: semver.map(|v| v.major() as i32),
790 minor: semver.map(|v| v.minor() as i32),
791 patch: semver.map(|v| v.patch() as i32),
792 checksum_matched,
793 release_channel: body.release_channel.clone().unwrap_or_default(),
794 os_name: body.os_name.clone(),
795 os_version: body.os_version.clone().unwrap_or_default(),
796 installation_id: body.installation_id.clone().unwrap_or_default(),
797 session_id: body.session_id.clone(),
798 is_staff: body.is_staff,
799 time: time.timestamp_millis(),
800 file_extension: event.file_extension.unwrap_or_default(),
801 signed_in: wrapper.signed_in,
802 country_code: country_code.unwrap_or("XX".to_string()),
803 region_code: "".to_string(),
804 city: "".to_string(),
805 provider: event.provider,
806 suggestion_accepted: event.suggestion_accepted,
807 }
808 }
809}
810
811#[derive(Serialize, Debug, clickhouse::Row)]
812pub struct CallEventRow {
813 // AppInfoBase
814 app_version: String,
815 major: Option<i32>,
816 minor: Option<i32>,
817 patch: Option<i32>,
818 release_channel: String,
819 os_name: String,
820 os_version: String,
821 checksum_matched: bool,
822
823 // ClientEventBase
824 installation_id: String,
825 session_id: Option<String>,
826 is_staff: Option<bool>,
827 time: i64,
828
829 // CallEventRow
830 operation: String,
831 room_id: Option<u64>,
832 channel_id: Option<u64>,
833}
834
835impl CallEventRow {
836 fn from_event(
837 event: CallEvent,
838 wrapper: &EventWrapper,
839 body: &EventRequestBody,
840 first_event_at: chrono::DateTime<chrono::Utc>,
841 checksum_matched: bool,
842 ) -> Self {
843 let semver = body.semver();
844 let time =
845 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
846
847 Self {
848 app_version: body.app_version.clone(),
849 major: semver.map(|v| v.major() as i32),
850 minor: semver.map(|v| v.minor() as i32),
851 patch: semver.map(|v| v.patch() as i32),
852 checksum_matched,
853 release_channel: body.release_channel.clone().unwrap_or_default(),
854 os_name: body.os_name.clone(),
855 os_version: body.os_version.clone().unwrap_or_default(),
856 installation_id: body.installation_id.clone().unwrap_or_default(),
857 session_id: body.session_id.clone(),
858 is_staff: body.is_staff,
859 time: time.timestamp_millis(),
860 operation: event.operation,
861 room_id: event.room_id,
862 channel_id: event.channel_id,
863 }
864 }
865}
866
867#[derive(Serialize, Debug, clickhouse::Row)]
868pub struct AssistantEventRow {
869 // AppInfoBase
870 app_version: String,
871 major: Option<i32>,
872 minor: Option<i32>,
873 patch: Option<i32>,
874 checksum_matched: bool,
875 release_channel: String,
876 os_name: String,
877 os_version: String,
878
879 // ClientEventBase
880 installation_id: Option<String>,
881 session_id: Option<String>,
882 is_staff: Option<bool>,
883 time: i64,
884
885 // AssistantEventRow
886 conversation_id: String,
887 kind: String,
888 model: String,
889 response_latency_in_ms: Option<i64>,
890 error_message: Option<String>,
891}
892
893impl AssistantEventRow {
894 fn from_event(
895 event: AssistantEvent,
896 wrapper: &EventWrapper,
897 body: &EventRequestBody,
898 first_event_at: chrono::DateTime<chrono::Utc>,
899 checksum_matched: bool,
900 ) -> Self {
901 let semver = body.semver();
902 let time =
903 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
904
905 Self {
906 app_version: body.app_version.clone(),
907 major: semver.map(|v| v.major() as i32),
908 minor: semver.map(|v| v.minor() as i32),
909 patch: semver.map(|v| v.patch() as i32),
910 checksum_matched,
911 release_channel: body.release_channel.clone().unwrap_or_default(),
912 os_name: body.os_name.clone(),
913 os_version: body.os_version.clone().unwrap_or_default(),
914 installation_id: body.installation_id.clone(),
915 session_id: body.session_id.clone(),
916 is_staff: body.is_staff,
917 time: time.timestamp_millis(),
918 conversation_id: event.conversation_id.unwrap_or_default(),
919 kind: event.kind.to_string(),
920 model: event.model,
921 response_latency_in_ms: event
922 .response_latency
923 .map(|latency| latency.as_millis() as i64),
924 error_message: event.error_message,
925 }
926 }
927}
928
929#[derive(Debug, clickhouse::Row, Serialize)]
930pub struct CpuEventRow {
931 installation_id: Option<String>,
932 is_staff: Option<bool>,
933 usage_as_percentage: f32,
934 core_count: u32,
935 app_version: String,
936 release_channel: String,
937 os_name: String,
938 os_version: String,
939 time: i64,
940 session_id: Option<String>,
941 // pub normalized_cpu_usage: f64, MATERIALIZED
942 major: Option<i32>,
943 minor: Option<i32>,
944 patch: Option<i32>,
945 checksum_matched: bool,
946}
947
948impl CpuEventRow {
949 fn from_event(
950 event: CpuEvent,
951 wrapper: &EventWrapper,
952 body: &EventRequestBody,
953 first_event_at: chrono::DateTime<chrono::Utc>,
954 checksum_matched: bool,
955 ) -> Self {
956 let semver = body.semver();
957 let time =
958 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
959
960 Self {
961 app_version: body.app_version.clone(),
962 major: semver.map(|v| v.major() as i32),
963 minor: semver.map(|v| v.minor() as i32),
964 patch: semver.map(|v| v.patch() as i32),
965 checksum_matched,
966 release_channel: body.release_channel.clone().unwrap_or_default(),
967 os_name: body.os_name.clone(),
968 os_version: body.os_version.clone().unwrap_or_default(),
969 installation_id: body.installation_id.clone(),
970 session_id: body.session_id.clone(),
971 is_staff: body.is_staff,
972 time: time.timestamp_millis(),
973 usage_as_percentage: event.usage_as_percentage,
974 core_count: event.core_count,
975 }
976 }
977}
978
979#[derive(Serialize, Debug, clickhouse::Row)]
980pub struct MemoryEventRow {
981 // AppInfoBase
982 app_version: String,
983 major: Option<i32>,
984 minor: Option<i32>,
985 patch: Option<i32>,
986 checksum_matched: bool,
987 release_channel: String,
988 os_name: String,
989 os_version: String,
990
991 // ClientEventBase
992 installation_id: Option<String>,
993 session_id: Option<String>,
994 is_staff: Option<bool>,
995 time: i64,
996
997 // MemoryEventRow
998 memory_in_bytes: u64,
999 virtual_memory_in_bytes: u64,
1000}
1001
1002impl MemoryEventRow {
1003 fn from_event(
1004 event: MemoryEvent,
1005 wrapper: &EventWrapper,
1006 body: &EventRequestBody,
1007 first_event_at: chrono::DateTime<chrono::Utc>,
1008 checksum_matched: bool,
1009 ) -> Self {
1010 let semver = body.semver();
1011 let time =
1012 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1013
1014 Self {
1015 app_version: body.app_version.clone(),
1016 major: semver.map(|v| v.major() as i32),
1017 minor: semver.map(|v| v.minor() as i32),
1018 patch: semver.map(|v| v.patch() as i32),
1019 checksum_matched,
1020 release_channel: body.release_channel.clone().unwrap_or_default(),
1021 os_name: body.os_name.clone(),
1022 os_version: body.os_version.clone().unwrap_or_default(),
1023 installation_id: body.installation_id.clone(),
1024 session_id: body.session_id.clone(),
1025 is_staff: body.is_staff,
1026 time: time.timestamp_millis(),
1027 memory_in_bytes: event.memory_in_bytes,
1028 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
1029 }
1030 }
1031}
1032
1033#[derive(Serialize, Debug, clickhouse::Row)]
1034pub struct AppEventRow {
1035 // AppInfoBase
1036 app_version: String,
1037 major: Option<i32>,
1038 minor: Option<i32>,
1039 patch: Option<i32>,
1040 checksum_matched: bool,
1041 release_channel: String,
1042 os_name: String,
1043 os_version: String,
1044
1045 // ClientEventBase
1046 installation_id: Option<String>,
1047 session_id: Option<String>,
1048 is_staff: Option<bool>,
1049 time: i64,
1050
1051 // AppEventRow
1052 operation: String,
1053}
1054
1055impl AppEventRow {
1056 fn from_event(
1057 event: AppEvent,
1058 wrapper: &EventWrapper,
1059 body: &EventRequestBody,
1060 first_event_at: chrono::DateTime<chrono::Utc>,
1061 checksum_matched: bool,
1062 ) -> Self {
1063 let semver = body.semver();
1064 let time =
1065 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1066
1067 Self {
1068 app_version: body.app_version.clone(),
1069 major: semver.map(|v| v.major() as i32),
1070 minor: semver.map(|v| v.minor() as i32),
1071 patch: semver.map(|v| v.patch() as i32),
1072 checksum_matched,
1073 release_channel: body.release_channel.clone().unwrap_or_default(),
1074 os_name: body.os_name.clone(),
1075 os_version: body.os_version.clone().unwrap_or_default(),
1076 installation_id: body.installation_id.clone(),
1077 session_id: body.session_id.clone(),
1078 is_staff: body.is_staff,
1079 time: time.timestamp_millis(),
1080 operation: event.operation,
1081 }
1082 }
1083}
1084
1085#[derive(Serialize, Debug, clickhouse::Row)]
1086pub struct SettingEventRow {
1087 // AppInfoBase
1088 app_version: String,
1089 major: Option<i32>,
1090 minor: Option<i32>,
1091 patch: Option<i32>,
1092 checksum_matched: bool,
1093 release_channel: String,
1094 os_name: String,
1095 os_version: String,
1096
1097 // ClientEventBase
1098 installation_id: Option<String>,
1099 session_id: Option<String>,
1100 is_staff: Option<bool>,
1101 time: i64,
1102 // SettingEventRow
1103 setting: String,
1104 value: String,
1105}
1106
1107impl SettingEventRow {
1108 fn from_event(
1109 event: SettingEvent,
1110 wrapper: &EventWrapper,
1111 body: &EventRequestBody,
1112 first_event_at: chrono::DateTime<chrono::Utc>,
1113 checksum_matched: bool,
1114 ) -> Self {
1115 let semver = body.semver();
1116 let time =
1117 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1118
1119 Self {
1120 app_version: body.app_version.clone(),
1121 major: semver.map(|v| v.major() as i32),
1122 minor: semver.map(|v| v.minor() as i32),
1123 checksum_matched,
1124 patch: semver.map(|v| v.patch() as i32),
1125 release_channel: body.release_channel.clone().unwrap_or_default(),
1126 os_name: body.os_name.clone(),
1127 os_version: body.os_version.clone().unwrap_or_default(),
1128 installation_id: body.installation_id.clone(),
1129 session_id: body.session_id.clone(),
1130 is_staff: body.is_staff,
1131 time: time.timestamp_millis(),
1132 setting: event.setting,
1133 value: event.value,
1134 }
1135 }
1136}
1137
1138#[derive(Serialize, Debug, clickhouse::Row)]
1139pub struct ExtensionEventRow {
1140 // AppInfoBase
1141 app_version: String,
1142 major: Option<i32>,
1143 minor: Option<i32>,
1144 patch: Option<i32>,
1145 checksum_matched: bool,
1146 release_channel: String,
1147 os_name: String,
1148 os_version: String,
1149
1150 // ClientEventBase
1151 installation_id: Option<String>,
1152 session_id: Option<String>,
1153 is_staff: Option<bool>,
1154 time: i64,
1155
1156 // ExtensionEventRow
1157 extension_id: Arc<str>,
1158 extension_version: Arc<str>,
1159 dev: bool,
1160 schema_version: Option<i32>,
1161 wasm_api_version: Option<String>,
1162}
1163
1164impl ExtensionEventRow {
1165 fn from_event(
1166 event: ExtensionEvent,
1167 wrapper: &EventWrapper,
1168 body: &EventRequestBody,
1169 extension_metadata: Option<ExtensionMetadata>,
1170 first_event_at: chrono::DateTime<chrono::Utc>,
1171 checksum_matched: bool,
1172 ) -> Self {
1173 let semver = body.semver();
1174 let time =
1175 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1176
1177 Self {
1178 app_version: body.app_version.clone(),
1179 major: semver.map(|v| v.major() as i32),
1180 minor: semver.map(|v| v.minor() as i32),
1181 patch: semver.map(|v| v.patch() as i32),
1182 checksum_matched,
1183 release_channel: body.release_channel.clone().unwrap_or_default(),
1184 os_name: body.os_name.clone(),
1185 os_version: body.os_version.clone().unwrap_or_default(),
1186 installation_id: body.installation_id.clone(),
1187 session_id: body.session_id.clone(),
1188 is_staff: body.is_staff,
1189 time: time.timestamp_millis(),
1190 extension_id: event.extension_id,
1191 extension_version: event.version,
1192 dev: extension_metadata.is_none(),
1193 schema_version: extension_metadata
1194 .as_ref()
1195 .and_then(|metadata| metadata.manifest.schema_version),
1196 wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1197 metadata
1198 .manifest
1199 .wasm_api_version
1200 .as_ref()
1201 .map(|version| version.to_string())
1202 }),
1203 }
1204 }
1205}
1206
1207#[derive(Serialize, Debug, clickhouse::Row)]
1208pub struct ReplEventRow {
1209 // AppInfoBase
1210 app_version: String,
1211 major: Option<i32>,
1212 minor: Option<i32>,
1213 patch: Option<i32>,
1214 checksum_matched: bool,
1215 release_channel: String,
1216 os_name: String,
1217 os_version: String,
1218
1219 // ClientEventBase
1220 installation_id: Option<String>,
1221 session_id: Option<String>,
1222 is_staff: Option<bool>,
1223 time: i64,
1224
1225 // ReplEventRow
1226 kernel_language: String,
1227 kernel_status: String,
1228 repl_session_id: String,
1229}
1230
1231impl ReplEventRow {
1232 fn from_event(
1233 event: ReplEvent,
1234 wrapper: &EventWrapper,
1235 body: &EventRequestBody,
1236 first_event_at: chrono::DateTime<chrono::Utc>,
1237 checksum_matched: bool,
1238 ) -> Self {
1239 let semver = body.semver();
1240 let time =
1241 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1242
1243 Self {
1244 app_version: body.app_version.clone(),
1245 major: semver.map(|v| v.major() as i32),
1246 minor: semver.map(|v| v.minor() as i32),
1247 patch: semver.map(|v| v.patch() as i32),
1248 checksum_matched,
1249 release_channel: body.release_channel.clone().unwrap_or_default(),
1250 os_name: body.os_name.clone(),
1251 os_version: body.os_version.clone().unwrap_or_default(),
1252 installation_id: body.installation_id.clone(),
1253 session_id: body.session_id.clone(),
1254 is_staff: body.is_staff,
1255 time: time.timestamp_millis(),
1256 kernel_language: event.kernel_language,
1257 kernel_status: event.kernel_status,
1258 repl_session_id: event.repl_session_id,
1259 }
1260 }
1261}
1262
1263#[derive(Serialize, Debug, clickhouse::Row)]
1264pub struct EditEventRow {
1265 // AppInfoBase
1266 app_version: String,
1267 major: Option<i32>,
1268 minor: Option<i32>,
1269 patch: Option<i32>,
1270 checksum_matched: bool,
1271 release_channel: String,
1272 os_name: String,
1273 os_version: String,
1274
1275 // ClientEventBase
1276 installation_id: Option<String>,
1277 // Note: This column name has a typo in the ClickHouse table.
1278 #[serde(rename = "sesssion_id")]
1279 session_id: Option<String>,
1280 is_staff: Option<bool>,
1281 time: i64,
1282
1283 // EditEventRow
1284 period_start: i64,
1285 period_end: i64,
1286 environment: String,
1287}
1288
1289impl EditEventRow {
1290 fn from_event(
1291 event: EditEvent,
1292 wrapper: &EventWrapper,
1293 body: &EventRequestBody,
1294 first_event_at: chrono::DateTime<chrono::Utc>,
1295 checksum_matched: bool,
1296 ) -> Self {
1297 let semver = body.semver();
1298 let time =
1299 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1300
1301 let period_start = time - chrono::Duration::milliseconds(event.duration);
1302 let period_end = time;
1303
1304 Self {
1305 app_version: body.app_version.clone(),
1306 major: semver.map(|v| v.major() as i32),
1307 minor: semver.map(|v| v.minor() as i32),
1308 patch: semver.map(|v| v.patch() as i32),
1309 checksum_matched,
1310 release_channel: body.release_channel.clone().unwrap_or_default(),
1311 os_name: body.os_name.clone(),
1312 os_version: body.os_version.clone().unwrap_or_default(),
1313 installation_id: body.installation_id.clone(),
1314 session_id: body.session_id.clone(),
1315 is_staff: body.is_staff,
1316 time: time.timestamp_millis(),
1317 period_start: period_start.timestamp_millis(),
1318 period_end: period_end.timestamp_millis(),
1319 environment: event.environment,
1320 }
1321 }
1322}
1323
1324#[derive(Serialize, Debug, clickhouse::Row)]
1325pub struct ActionEventRow {
1326 // AppInfoBase
1327 app_version: String,
1328 major: Option<i32>,
1329 minor: Option<i32>,
1330 patch: Option<i32>,
1331 checksum_matched: bool,
1332 release_channel: String,
1333 os_name: String,
1334 os_version: String,
1335
1336 // ClientEventBase
1337 installation_id: Option<String>,
1338 // Note: This column name has a typo in the ClickHouse table.
1339 #[serde(rename = "sesssion_id")]
1340 session_id: Option<String>,
1341 is_staff: Option<bool>,
1342 time: i64,
1343 // ActionEventRow
1344 source: String,
1345 action: String,
1346}
1347
1348impl ActionEventRow {
1349 fn from_event(
1350 event: ActionEvent,
1351 wrapper: &EventWrapper,
1352 body: &EventRequestBody,
1353 first_event_at: chrono::DateTime<chrono::Utc>,
1354 checksum_matched: bool,
1355 ) -> Self {
1356 let semver = body.semver();
1357 let time =
1358 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1359
1360 Self {
1361 app_version: body.app_version.clone(),
1362 major: semver.map(|v| v.major() as i32),
1363 minor: semver.map(|v| v.minor() as i32),
1364 patch: semver.map(|v| v.patch() as i32),
1365 checksum_matched,
1366 release_channel: body.release_channel.clone().unwrap_or_default(),
1367 os_name: body.os_name.clone(),
1368 os_version: body.os_version.clone().unwrap_or_default(),
1369 installation_id: body.installation_id.clone(),
1370 session_id: body.session_id.clone(),
1371 is_staff: body.is_staff,
1372 time: time.timestamp_millis(),
1373 source: event.source,
1374 action: event.action,
1375 }
1376 }
1377}
1378
1379pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1380 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
1381 return None;
1382 };
1383
1384 let mut summer = Sha256::new();
1385 summer.update(checksum_seed);
1386 summer.update(&json);
1387 summer.update(checksum_seed);
1388 Some(summer.finalize().into_iter().collect())
1389}