1use super::ips_file::IpsFile;
2use crate::{api::slack, AppState, Error, Result};
3use anyhow::{anyhow, Context};
4use aws_sdk_s3::primitives::ByteStream;
5use axum::{
6 body::Bytes,
7 headers::Header,
8 http::{HeaderMap, HeaderName, StatusCode},
9 routing::post,
10 Extension, Router, TypedHeader,
11};
12use rpc::ExtensionMetadata;
13use semantic_version::SemanticVersion;
14use serde::{Serialize, Serializer};
15use sha2::{Digest, Sha256};
16use std::sync::{Arc, OnceLock};
17use telemetry_events::{
18 ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
19 EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent, SettingEvent,
20};
21use uuid::Uuid;
22
23static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
24
25pub fn router() -> Router {
26 Router::new()
27 .route("/telemetry/events", post(post_events))
28 .route("/telemetry/crashes", post(post_crash))
29 .route("/telemetry/panics", post(post_panic))
30 .route("/telemetry/hangs", post(post_hang))
31}
32
33pub struct ZedChecksumHeader(Vec<u8>);
34
35impl Header for ZedChecksumHeader {
36 fn name() -> &'static HeaderName {
37 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
38 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
39 }
40
41 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
42 where
43 Self: Sized,
44 I: Iterator<Item = &'i axum::http::HeaderValue>,
45 {
46 let checksum = values
47 .next()
48 .ok_or_else(axum::headers::Error::invalid)?
49 .to_str()
50 .map_err(|_| axum::headers::Error::invalid())?;
51
52 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
53 Ok(Self(bytes))
54 }
55
56 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
57 unimplemented!()
58 }
59}
60
61pub struct CloudflareIpCountryHeader(String);
62
63impl Header for CloudflareIpCountryHeader {
64 fn name() -> &'static HeaderName {
65 static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
66 CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
67 }
68
69 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
70 where
71 Self: Sized,
72 I: Iterator<Item = &'i axum::http::HeaderValue>,
73 {
74 let country_code = values
75 .next()
76 .ok_or_else(axum::headers::Error::invalid)?
77 .to_str()
78 .map_err(|_| axum::headers::Error::invalid())?;
79
80 Ok(Self(country_code.to_string()))
81 }
82
83 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
84 unimplemented!()
85 }
86}
87
88pub async fn post_crash(
89 Extension(app): Extension<Arc<AppState>>,
90 headers: HeaderMap,
91 body: Bytes,
92) -> Result<()> {
93 let report = IpsFile::parse(&body)?;
94 let version_threshold = SemanticVersion::new(0, 123, 0);
95
96 let bundle_id = &report.header.bundle_id;
97 let app_version = &report.app_version();
98
99 if bundle_id == "dev.zed.Zed-Dev" {
100 log::error!("Crash uploads from {} are ignored.", bundle_id);
101 return Ok(());
102 }
103
104 if app_version.is_none() || app_version.unwrap() < version_threshold {
105 log::error!(
106 "Crash uploads from {} are ignored.",
107 report.header.app_version
108 );
109 return Ok(());
110 }
111 let app_version = app_version.unwrap();
112
113 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
114 let response = blob_store_client
115 .head_object()
116 .bucket(CRASH_REPORTS_BUCKET)
117 .key(report.header.incident_id.clone() + ".ips")
118 .send()
119 .await;
120
121 if response.is_ok() {
122 log::info!("We've already uploaded this crash");
123 return Ok(());
124 }
125
126 blob_store_client
127 .put_object()
128 .bucket(CRASH_REPORTS_BUCKET)
129 .key(report.header.incident_id.clone() + ".ips")
130 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
131 .body(ByteStream::from(body.to_vec()))
132 .send()
133 .await
134 .map_err(|e| log::error!("Failed to upload crash: {}", e))
135 .ok();
136 }
137
138 let recent_panic_on: Option<i64> = headers
139 .get("x-zed-panicked-on")
140 .and_then(|h| h.to_str().ok())
141 .and_then(|s| s.parse().ok());
142
143 let installation_id = headers
144 .get("x-zed-installation-id")
145 .and_then(|h| h.to_str().ok())
146 .map(|s| s.to_string())
147 .unwrap_or_default();
148
149 let mut recent_panic = None;
150
151 if let Some(recent_panic_on) = recent_panic_on {
152 let crashed_at = match report.timestamp() {
153 Ok(t) => Some(t),
154 Err(e) => {
155 log::error!("Can't parse {}: {}", report.header.timestamp, e);
156 None
157 }
158 };
159 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
160 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
161 }
162 }
163
164 let description = report.description(recent_panic);
165 let summary = report.backtrace_summary();
166
167 tracing::error!(
168 service = "client",
169 version = %report.header.app_version,
170 os_version = %report.header.os_version,
171 bundle_id = %report.header.bundle_id,
172 incident_id = %report.header.incident_id,
173 installation_id = %installation_id,
174 description = %description,
175 backtrace = %summary,
176 "crash report");
177
178 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
179 let payload = slack::WebhookBody::new(|w| {
180 w.add_section(|s| s.text(slack::Text::markdown(description)))
181 .add_section(|s| {
182 s.add_field(slack::Text::markdown(format!(
183 "*Version:*\n{} ({})",
184 bundle_id, app_version
185 )))
186 .add_field({
187 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
188 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
189 hostname.strip_prefix("http://").unwrap_or_default()
190 });
191
192 slack::Text::markdown(format!(
193 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
194 CRASH_REPORTS_BUCKET,
195 hostname,
196 report.header.incident_id,
197 report
198 .header
199 .incident_id
200 .chars()
201 .take(8)
202 .collect::<String>(),
203 ))
204 })
205 })
206 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
207 });
208 let payload_json = serde_json::to_string(&payload).map_err(|err| {
209 log::error!("Failed to serialize payload to JSON: {err}");
210 Error::Internal(anyhow!(err))
211 })?;
212
213 reqwest::Client::new()
214 .post(slack_panics_webhook)
215 .header("Content-Type", "application/json")
216 .body(payload_json)
217 .send()
218 .await
219 .map_err(|err| {
220 log::error!("Failed to send payload to Slack: {err}");
221 Error::Internal(anyhow!(err))
222 })?;
223 }
224
225 Ok(())
226}
227
228pub async fn post_hang(
229 Extension(app): Extension<Arc<AppState>>,
230 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
231 body: Bytes,
232) -> Result<()> {
233 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
234 return Err(Error::Http(
235 StatusCode::INTERNAL_SERVER_ERROR,
236 "events not enabled".into(),
237 ))?;
238 };
239
240 if checksum != expected {
241 return Err(Error::Http(
242 StatusCode::BAD_REQUEST,
243 "invalid checksum".into(),
244 ))?;
245 }
246
247 let incident_id = Uuid::new_v4().to_string();
248
249 // dump JSON into S3 so we can get frame offsets if we need to.
250 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
251 blob_store_client
252 .put_object()
253 .bucket(CRASH_REPORTS_BUCKET)
254 .key(incident_id.clone() + ".hang.json")
255 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
256 .body(ByteStream::from(body.to_vec()))
257 .send()
258 .await
259 .map_err(|e| log::error!("Failed to upload crash: {}", e))
260 .ok();
261 }
262
263 let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
264 log::error!("can't parse report json: {err}");
265 Error::Internal(anyhow!(err))
266 })?;
267
268 let mut backtrace = "Possible hang detected on main thread:".to_string();
269 let unknown = "<unknown>".to_string();
270 for frame in report.backtrace.iter() {
271 backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
272 }
273
274 tracing::error!(
275 service = "client",
276 version = %report.app_version.unwrap_or_default().to_string(),
277 os_name = %report.os_name,
278 os_version = report.os_version.unwrap_or_default().to_string(),
279 incident_id = %incident_id,
280 installation_id = %report.installation_id.unwrap_or_default(),
281 backtrace = %backtrace,
282 "hang report");
283
284 Ok(())
285}
286
287pub async fn post_panic(
288 Extension(app): Extension<Arc<AppState>>,
289 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
290 body: Bytes,
291) -> Result<()> {
292 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
293 return Err(Error::Http(
294 StatusCode::INTERNAL_SERVER_ERROR,
295 "events not enabled".into(),
296 ))?;
297 };
298
299 if checksum != expected {
300 return Err(Error::Http(
301 StatusCode::BAD_REQUEST,
302 "invalid checksum".into(),
303 ))?;
304 }
305
306 let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
307 .map_err(|_| Error::Http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
308 let panic = report.panic;
309
310 tracing::error!(
311 service = "client",
312 version = %panic.app_version,
313 os_name = %panic.os_name,
314 os_version = %panic.os_version.clone().unwrap_or_default(),
315 installation_id = %panic.installation_id.unwrap_or_default(),
316 description = %panic.payload,
317 backtrace = %panic.backtrace.join("\n"),
318 "panic report");
319
320 let backtrace = if panic.backtrace.len() > 25 {
321 let total = panic.backtrace.len();
322 format!(
323 "{}\n and {} more",
324 panic
325 .backtrace
326 .iter()
327 .take(20)
328 .cloned()
329 .collect::<Vec<_>>()
330 .join("\n"),
331 total - 20
332 )
333 } else {
334 panic.backtrace.join("\n")
335 };
336 let backtrace_with_summary = panic.payload + "\n" + &backtrace;
337
338 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
339 let payload = slack::WebhookBody::new(|w| {
340 w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
341 .add_section(|s| {
342 s.add_field(slack::Text::markdown(format!(
343 "*Version:*\n {} ",
344 panic.app_version
345 )))
346 .add_field({
347 slack::Text::markdown(format!(
348 "*OS:*\n{} {}",
349 panic.os_name,
350 panic.os_version.unwrap_or_default()
351 ))
352 })
353 })
354 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
355 });
356 let payload_json = serde_json::to_string(&payload).map_err(|err| {
357 log::error!("Failed to serialize payload to JSON: {err}");
358 Error::Internal(anyhow!(err))
359 })?;
360
361 reqwest::Client::new()
362 .post(slack_panics_webhook)
363 .header("Content-Type", "application/json")
364 .body(payload_json)
365 .send()
366 .await
367 .map_err(|err| {
368 log::error!("Failed to send payload to Slack: {err}");
369 Error::Internal(anyhow!(err))
370 })?;
371 }
372
373 Ok(())
374}
375
376pub async fn post_events(
377 Extension(app): Extension<Arc<AppState>>,
378 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
379 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
380 body: Bytes,
381) -> Result<()> {
382 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
383 Err(Error::Http(
384 StatusCode::NOT_IMPLEMENTED,
385 "not supported".into(),
386 ))?
387 };
388
389 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
390 return Err(Error::Http(
391 StatusCode::INTERNAL_SERVER_ERROR,
392 "events not enabled".into(),
393 ))?;
394 };
395
396 if checksum != expected {
397 return Err(Error::Http(
398 StatusCode::BAD_REQUEST,
399 "invalid checksum".into(),
400 ))?;
401 }
402
403 let request_body: telemetry_events::EventRequestBody =
404 serde_json::from_slice(&body).map_err(|err| {
405 log::error!("can't parse event json: {err}");
406 Error::Internal(anyhow!(err))
407 })?;
408
409 let mut to_upload = ToUpload::default();
410 let Some(last_event) = request_body.events.last() else {
411 return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
412 };
413 let country_code = country_code_header.map(|h| h.0 .0);
414
415 let first_event_at = chrono::Utc::now()
416 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
417
418 for wrapper in &request_body.events {
419 match &wrapper.event {
420 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
421 event.clone(),
422 &wrapper,
423 &request_body,
424 first_event_at,
425 country_code.clone(),
426 )),
427 Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
428 event.clone(),
429 &wrapper,
430 &request_body,
431 first_event_at,
432 country_code.clone(),
433 )),
434 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
435 event.clone(),
436 &wrapper,
437 &request_body,
438 first_event_at,
439 )),
440 Event::Assistant(event) => {
441 to_upload
442 .assistant_events
443 .push(AssistantEventRow::from_event(
444 event.clone(),
445 &wrapper,
446 &request_body,
447 first_event_at,
448 ))
449 }
450 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
451 event.clone(),
452 &wrapper,
453 &request_body,
454 first_event_at,
455 )),
456 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
457 event.clone(),
458 &wrapper,
459 &request_body,
460 first_event_at,
461 )),
462 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
463 event.clone(),
464 &wrapper,
465 &request_body,
466 first_event_at,
467 )),
468 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
469 event.clone(),
470 &wrapper,
471 &request_body,
472 first_event_at,
473 )),
474 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
475 event.clone(),
476 &wrapper,
477 &request_body,
478 first_event_at,
479 )),
480 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
481 event.clone(),
482 &wrapper,
483 &request_body,
484 first_event_at,
485 )),
486 Event::Extension(event) => {
487 let metadata = app
488 .db
489 .get_extension_version(&event.extension_id, &event.version)
490 .await?;
491 to_upload
492 .extension_events
493 .push(ExtensionEventRow::from_event(
494 event.clone(),
495 &wrapper,
496 &request_body,
497 metadata,
498 first_event_at,
499 ))
500 }
501 }
502 }
503
504 to_upload
505 .upload(&clickhouse_client)
506 .await
507 .map_err(|err| Error::Internal(anyhow!(err)))?;
508
509 Ok(())
510}
511
512#[derive(Default)]
513struct ToUpload {
514 editor_events: Vec<EditorEventRow>,
515 copilot_events: Vec<CopilotEventRow>,
516 assistant_events: Vec<AssistantEventRow>,
517 call_events: Vec<CallEventRow>,
518 cpu_events: Vec<CpuEventRow>,
519 memory_events: Vec<MemoryEventRow>,
520 app_events: Vec<AppEventRow>,
521 setting_events: Vec<SettingEventRow>,
522 extension_events: Vec<ExtensionEventRow>,
523 edit_events: Vec<EditEventRow>,
524 action_events: Vec<ActionEventRow>,
525}
526
527impl ToUpload {
528 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
529 const EDITOR_EVENTS_TABLE: &str = "editor_events";
530 Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
531 .await
532 .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
533
534 const COPILOT_EVENTS_TABLE: &str = "copilot_events";
535 Self::upload_to_table(
536 COPILOT_EVENTS_TABLE,
537 &self.copilot_events,
538 clickhouse_client,
539 )
540 .await
541 .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
542
543 const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
544 Self::upload_to_table(
545 ASSISTANT_EVENTS_TABLE,
546 &self.assistant_events,
547 clickhouse_client,
548 )
549 .await
550 .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
551
552 const CALL_EVENTS_TABLE: &str = "call_events";
553 Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
554 .await
555 .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
556
557 const CPU_EVENTS_TABLE: &str = "cpu_events";
558 Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
559 .await
560 .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
561
562 const MEMORY_EVENTS_TABLE: &str = "memory_events";
563 Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
564 .await
565 .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
566
567 const APP_EVENTS_TABLE: &str = "app_events";
568 Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
569 .await
570 .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
571
572 const SETTING_EVENTS_TABLE: &str = "setting_events";
573 Self::upload_to_table(
574 SETTING_EVENTS_TABLE,
575 &self.setting_events,
576 clickhouse_client,
577 )
578 .await
579 .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
580
581 const EXTENSION_EVENTS_TABLE: &str = "extension_events";
582 Self::upload_to_table(
583 EXTENSION_EVENTS_TABLE,
584 &self.extension_events,
585 clickhouse_client,
586 )
587 .await
588 .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
589
590 const EDIT_EVENTS_TABLE: &str = "edit_events";
591 Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
592 .await
593 .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
594
595 const ACTION_EVENTS_TABLE: &str = "action_events";
596 Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
597 .await
598 .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
599
600 Ok(())
601 }
602
603 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
604 table: &str,
605 rows: &[T],
606 clickhouse_client: &clickhouse::Client,
607 ) -> anyhow::Result<()> {
608 if !rows.is_empty() {
609 let mut insert = clickhouse_client.insert(table)?;
610
611 for event in rows {
612 insert.write(event).await?;
613 }
614
615 insert.end().await?;
616
617 let event_count = rows.len();
618 log::info!(
619 "wrote {event_count} {event_specifier} to '{table}'",
620 event_specifier = if event_count == 1 { "event" } else { "events" }
621 );
622 }
623
624 Ok(())
625 }
626}
627
628pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
629where
630 S: Serializer,
631{
632 if country_code.len() != 2 {
633 use serde::ser::Error;
634 return Err(S::Error::custom(
635 "country_code must be exactly 2 characters",
636 ));
637 }
638
639 let country_code = country_code.as_bytes();
640
641 serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
642}
643
644#[derive(Serialize, Debug, clickhouse::Row)]
645pub struct EditorEventRow {
646 pub installation_id: String,
647 pub operation: String,
648 pub app_version: String,
649 pub file_extension: String,
650 pub os_name: String,
651 pub os_version: String,
652 pub release_channel: String,
653 pub signed_in: bool,
654 pub vim_mode: bool,
655 #[serde(serialize_with = "serialize_country_code")]
656 pub country_code: String,
657 pub region_code: String,
658 pub city: String,
659 pub time: i64,
660 pub copilot_enabled: bool,
661 pub copilot_enabled_for_language: bool,
662 pub historical_event: bool,
663 pub architecture: String,
664 pub is_staff: Option<bool>,
665 pub session_id: Option<String>,
666 pub major: Option<i32>,
667 pub minor: Option<i32>,
668 pub patch: Option<i32>,
669}
670
671impl EditorEventRow {
672 fn from_event(
673 event: EditorEvent,
674 wrapper: &EventWrapper,
675 body: &EventRequestBody,
676 first_event_at: chrono::DateTime<chrono::Utc>,
677 country_code: Option<String>,
678 ) -> Self {
679 let semver = body.semver();
680 let time =
681 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
682
683 Self {
684 app_version: body.app_version.clone(),
685 major: semver.map(|v| v.major() as i32),
686 minor: semver.map(|v| v.minor() as i32),
687 patch: semver.map(|v| v.patch() as i32),
688 release_channel: body.release_channel.clone().unwrap_or_default(),
689 os_name: body.os_name.clone(),
690 os_version: body.os_version.clone().unwrap_or_default(),
691 architecture: body.architecture.clone(),
692 installation_id: body.installation_id.clone().unwrap_or_default(),
693 session_id: body.session_id.clone(),
694 is_staff: body.is_staff,
695 time: time.timestamp_millis(),
696 operation: event.operation,
697 file_extension: event.file_extension.unwrap_or_default(),
698 signed_in: wrapper.signed_in,
699 vim_mode: event.vim_mode,
700 copilot_enabled: event.copilot_enabled,
701 copilot_enabled_for_language: event.copilot_enabled_for_language,
702 country_code: country_code.unwrap_or("XX".to_string()),
703 region_code: "".to_string(),
704 city: "".to_string(),
705 historical_event: false,
706 }
707 }
708}
709
710#[derive(Serialize, Debug, clickhouse::Row)]
711pub struct CopilotEventRow {
712 pub installation_id: String,
713 pub suggestion_id: String,
714 pub suggestion_accepted: bool,
715 pub app_version: String,
716 pub file_extension: String,
717 pub os_name: String,
718 pub os_version: String,
719 pub release_channel: String,
720 pub signed_in: bool,
721 #[serde(serialize_with = "serialize_country_code")]
722 pub country_code: String,
723 pub region_code: String,
724 pub city: String,
725 pub time: i64,
726 pub is_staff: Option<bool>,
727 pub session_id: Option<String>,
728 pub major: Option<i32>,
729 pub minor: Option<i32>,
730 pub patch: Option<i32>,
731}
732
733impl CopilotEventRow {
734 fn from_event(
735 event: CopilotEvent,
736 wrapper: &EventWrapper,
737 body: &EventRequestBody,
738 first_event_at: chrono::DateTime<chrono::Utc>,
739 country_code: Option<String>,
740 ) -> Self {
741 let semver = body.semver();
742 let time =
743 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
744
745 Self {
746 app_version: body.app_version.clone(),
747 major: semver.map(|v| v.major() as i32),
748 minor: semver.map(|v| v.minor() as i32),
749 patch: semver.map(|v| v.patch() as i32),
750 release_channel: body.release_channel.clone().unwrap_or_default(),
751 os_name: body.os_name.clone(),
752 os_version: body.os_version.clone().unwrap_or_default(),
753 installation_id: body.installation_id.clone().unwrap_or_default(),
754 session_id: body.session_id.clone(),
755 is_staff: body.is_staff,
756 time: time.timestamp_millis(),
757 file_extension: event.file_extension.unwrap_or_default(),
758 signed_in: wrapper.signed_in,
759 country_code: country_code.unwrap_or("XX".to_string()),
760 region_code: "".to_string(),
761 city: "".to_string(),
762 suggestion_id: event.suggestion_id.unwrap_or_default(),
763 suggestion_accepted: event.suggestion_accepted,
764 }
765 }
766}
767
768#[derive(Serialize, Debug, clickhouse::Row)]
769pub struct CallEventRow {
770 // AppInfoBase
771 app_version: String,
772 major: Option<i32>,
773 minor: Option<i32>,
774 patch: Option<i32>,
775 release_channel: String,
776
777 // ClientEventBase
778 installation_id: String,
779 session_id: Option<String>,
780 is_staff: Option<bool>,
781 time: i64,
782
783 // CallEventRow
784 operation: String,
785 room_id: Option<u64>,
786 channel_id: Option<u64>,
787}
788
789impl CallEventRow {
790 fn from_event(
791 event: CallEvent,
792 wrapper: &EventWrapper,
793 body: &EventRequestBody,
794 first_event_at: chrono::DateTime<chrono::Utc>,
795 ) -> Self {
796 let semver = body.semver();
797 let time =
798 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
799
800 Self {
801 app_version: body.app_version.clone(),
802 major: semver.map(|v| v.major() as i32),
803 minor: semver.map(|v| v.minor() as i32),
804 patch: semver.map(|v| v.patch() as i32),
805 release_channel: body.release_channel.clone().unwrap_or_default(),
806 installation_id: body.installation_id.clone().unwrap_or_default(),
807 session_id: body.session_id.clone(),
808 is_staff: body.is_staff,
809 time: time.timestamp_millis(),
810 operation: event.operation,
811 room_id: event.room_id,
812 channel_id: event.channel_id,
813 }
814 }
815}
816
817#[derive(Serialize, Debug, clickhouse::Row)]
818pub struct AssistantEventRow {
819 // AppInfoBase
820 app_version: String,
821 major: Option<i32>,
822 minor: Option<i32>,
823 patch: Option<i32>,
824 release_channel: String,
825
826 // ClientEventBase
827 installation_id: Option<String>,
828 session_id: Option<String>,
829 is_staff: Option<bool>,
830 time: i64,
831
832 // AssistantEventRow
833 conversation_id: String,
834 kind: String,
835 model: String,
836 response_latency_in_ms: Option<i64>,
837 error_message: Option<String>,
838}
839
840impl AssistantEventRow {
841 fn from_event(
842 event: AssistantEvent,
843 wrapper: &EventWrapper,
844 body: &EventRequestBody,
845 first_event_at: chrono::DateTime<chrono::Utc>,
846 ) -> Self {
847 let semver = body.semver();
848 let time =
849 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
850
851 Self {
852 app_version: body.app_version.clone(),
853 major: semver.map(|v| v.major() as i32),
854 minor: semver.map(|v| v.minor() as i32),
855 patch: semver.map(|v| v.patch() as i32),
856 release_channel: body.release_channel.clone().unwrap_or_default(),
857 installation_id: body.installation_id.clone(),
858 session_id: body.session_id.clone(),
859 is_staff: body.is_staff,
860 time: time.timestamp_millis(),
861 conversation_id: event.conversation_id.unwrap_or_default(),
862 kind: event.kind.to_string(),
863 model: event.model,
864 response_latency_in_ms: event
865 .response_latency
866 .map(|latency| latency.as_millis() as i64),
867 error_message: event.error_message,
868 }
869 }
870}
871
872#[derive(Debug, clickhouse::Row, Serialize)]
873pub struct CpuEventRow {
874 pub installation_id: Option<String>,
875 pub is_staff: Option<bool>,
876 pub usage_as_percentage: f32,
877 pub core_count: u32,
878 pub app_version: String,
879 pub release_channel: String,
880 pub time: i64,
881 pub session_id: Option<String>,
882 // pub normalized_cpu_usage: f64, MATERIALIZED
883 pub major: Option<i32>,
884 pub minor: Option<i32>,
885 pub patch: Option<i32>,
886}
887
888impl CpuEventRow {
889 fn from_event(
890 event: CpuEvent,
891 wrapper: &EventWrapper,
892 body: &EventRequestBody,
893 first_event_at: chrono::DateTime<chrono::Utc>,
894 ) -> Self {
895 let semver = body.semver();
896 let time =
897 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
898
899 Self {
900 app_version: body.app_version.clone(),
901 major: semver.map(|v| v.major() as i32),
902 minor: semver.map(|v| v.minor() as i32),
903 patch: semver.map(|v| v.patch() as i32),
904 release_channel: body.release_channel.clone().unwrap_or_default(),
905 installation_id: body.installation_id.clone(),
906 session_id: body.session_id.clone(),
907 is_staff: body.is_staff,
908 time: time.timestamp_millis(),
909 usage_as_percentage: event.usage_as_percentage,
910 core_count: event.core_count,
911 }
912 }
913}
914
915#[derive(Serialize, Debug, clickhouse::Row)]
916pub struct MemoryEventRow {
917 // AppInfoBase
918 app_version: String,
919 major: Option<i32>,
920 minor: Option<i32>,
921 patch: Option<i32>,
922 release_channel: String,
923
924 // ClientEventBase
925 installation_id: Option<String>,
926 session_id: Option<String>,
927 is_staff: Option<bool>,
928 time: i64,
929
930 // MemoryEventRow
931 memory_in_bytes: u64,
932 virtual_memory_in_bytes: u64,
933}
934
935impl MemoryEventRow {
936 fn from_event(
937 event: MemoryEvent,
938 wrapper: &EventWrapper,
939 body: &EventRequestBody,
940 first_event_at: chrono::DateTime<chrono::Utc>,
941 ) -> Self {
942 let semver = body.semver();
943 let time =
944 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
945
946 Self {
947 app_version: body.app_version.clone(),
948 major: semver.map(|v| v.major() as i32),
949 minor: semver.map(|v| v.minor() as i32),
950 patch: semver.map(|v| v.patch() as i32),
951 release_channel: body.release_channel.clone().unwrap_or_default(),
952 installation_id: body.installation_id.clone(),
953 session_id: body.session_id.clone(),
954 is_staff: body.is_staff,
955 time: time.timestamp_millis(),
956 memory_in_bytes: event.memory_in_bytes,
957 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
958 }
959 }
960}
961
962#[derive(Serialize, Debug, clickhouse::Row)]
963pub struct AppEventRow {
964 // AppInfoBase
965 app_version: String,
966 major: Option<i32>,
967 minor: Option<i32>,
968 patch: Option<i32>,
969 release_channel: String,
970
971 // ClientEventBase
972 installation_id: Option<String>,
973 session_id: Option<String>,
974 is_staff: Option<bool>,
975 time: i64,
976
977 // AppEventRow
978 operation: String,
979}
980
981impl AppEventRow {
982 fn from_event(
983 event: AppEvent,
984 wrapper: &EventWrapper,
985 body: &EventRequestBody,
986 first_event_at: chrono::DateTime<chrono::Utc>,
987 ) -> Self {
988 let semver = body.semver();
989 let time =
990 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
991
992 Self {
993 app_version: body.app_version.clone(),
994 major: semver.map(|v| v.major() as i32),
995 minor: semver.map(|v| v.minor() as i32),
996 patch: semver.map(|v| v.patch() as i32),
997 release_channel: body.release_channel.clone().unwrap_or_default(),
998 installation_id: body.installation_id.clone(),
999 session_id: body.session_id.clone(),
1000 is_staff: body.is_staff,
1001 time: time.timestamp_millis(),
1002 operation: event.operation,
1003 }
1004 }
1005}
1006
1007#[derive(Serialize, Debug, clickhouse::Row)]
1008pub struct SettingEventRow {
1009 // AppInfoBase
1010 app_version: String,
1011 major: Option<i32>,
1012 minor: Option<i32>,
1013 patch: Option<i32>,
1014 release_channel: String,
1015
1016 // ClientEventBase
1017 installation_id: Option<String>,
1018 session_id: Option<String>,
1019 is_staff: Option<bool>,
1020 time: i64,
1021 // SettingEventRow
1022 setting: String,
1023 value: String,
1024}
1025
1026impl SettingEventRow {
1027 fn from_event(
1028 event: SettingEvent,
1029 wrapper: &EventWrapper,
1030 body: &EventRequestBody,
1031 first_event_at: chrono::DateTime<chrono::Utc>,
1032 ) -> Self {
1033 let semver = body.semver();
1034 let time =
1035 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1036
1037 Self {
1038 app_version: body.app_version.clone(),
1039 major: semver.map(|v| v.major() as i32),
1040 minor: semver.map(|v| v.minor() as i32),
1041 patch: semver.map(|v| v.patch() as i32),
1042 release_channel: body.release_channel.clone().unwrap_or_default(),
1043 installation_id: body.installation_id.clone(),
1044 session_id: body.session_id.clone(),
1045 is_staff: body.is_staff,
1046 time: time.timestamp_millis(),
1047 setting: event.setting,
1048 value: event.value,
1049 }
1050 }
1051}
1052
1053#[derive(Serialize, Debug, clickhouse::Row)]
1054pub struct ExtensionEventRow {
1055 // AppInfoBase
1056 app_version: String,
1057 major: Option<i32>,
1058 minor: Option<i32>,
1059 patch: Option<i32>,
1060 release_channel: String,
1061
1062 // ClientEventBase
1063 installation_id: Option<String>,
1064 session_id: Option<String>,
1065 is_staff: Option<bool>,
1066 time: i64,
1067
1068 // ExtensionEventRow
1069 extension_id: Arc<str>,
1070 extension_version: Arc<str>,
1071 dev: bool,
1072 schema_version: Option<i32>,
1073 wasm_api_version: Option<String>,
1074}
1075
1076impl ExtensionEventRow {
1077 fn from_event(
1078 event: ExtensionEvent,
1079 wrapper: &EventWrapper,
1080 body: &EventRequestBody,
1081 extension_metadata: Option<ExtensionMetadata>,
1082 first_event_at: chrono::DateTime<chrono::Utc>,
1083 ) -> Self {
1084 let semver = body.semver();
1085 let time =
1086 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1087
1088 Self {
1089 app_version: body.app_version.clone(),
1090 major: semver.map(|v| v.major() as i32),
1091 minor: semver.map(|v| v.minor() as i32),
1092 patch: semver.map(|v| v.patch() as i32),
1093 release_channel: body.release_channel.clone().unwrap_or_default(),
1094 installation_id: body.installation_id.clone(),
1095 session_id: body.session_id.clone(),
1096 is_staff: body.is_staff,
1097 time: time.timestamp_millis(),
1098 extension_id: event.extension_id,
1099 extension_version: event.version,
1100 dev: extension_metadata.is_none(),
1101 schema_version: extension_metadata
1102 .as_ref()
1103 .and_then(|metadata| metadata.manifest.schema_version),
1104 wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1105 metadata
1106 .manifest
1107 .wasm_api_version
1108 .as_ref()
1109 .map(|version| version.to_string())
1110 }),
1111 }
1112 }
1113}
1114
1115#[derive(Serialize, Debug, clickhouse::Row)]
1116pub struct EditEventRow {
1117 // AppInfoBase
1118 app_version: String,
1119 major: Option<i32>,
1120 minor: Option<i32>,
1121 patch: Option<i32>,
1122 release_channel: String,
1123
1124 // ClientEventBase
1125 installation_id: Option<String>,
1126 // Note: This column name has a typo in the ClickHouse table.
1127 #[serde(rename = "sesssion_id")]
1128 session_id: Option<String>,
1129 is_staff: Option<bool>,
1130 time: i64,
1131
1132 // EditEventRow
1133 period_start: i64,
1134 period_end: i64,
1135 environment: String,
1136}
1137
1138impl EditEventRow {
1139 fn from_event(
1140 event: EditEvent,
1141 wrapper: &EventWrapper,
1142 body: &EventRequestBody,
1143 first_event_at: chrono::DateTime<chrono::Utc>,
1144 ) -> Self {
1145 let semver = body.semver();
1146 let time =
1147 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1148
1149 let period_start = time - chrono::Duration::milliseconds(event.duration);
1150 let period_end = time;
1151
1152 Self {
1153 app_version: body.app_version.clone(),
1154 major: semver.map(|v| v.major() as i32),
1155 minor: semver.map(|v| v.minor() as i32),
1156 patch: semver.map(|v| v.patch() as i32),
1157 release_channel: body.release_channel.clone().unwrap_or_default(),
1158 installation_id: body.installation_id.clone(),
1159 session_id: body.session_id.clone(),
1160 is_staff: body.is_staff,
1161 time: time.timestamp_millis(),
1162 period_start: period_start.timestamp_millis(),
1163 period_end: period_end.timestamp_millis(),
1164 environment: event.environment,
1165 }
1166 }
1167}
1168
1169#[derive(Serialize, Debug, clickhouse::Row)]
1170pub struct ActionEventRow {
1171 // AppInfoBase
1172 app_version: String,
1173 major: Option<i32>,
1174 minor: Option<i32>,
1175 patch: Option<i32>,
1176 release_channel: String,
1177
1178 // ClientEventBase
1179 installation_id: Option<String>,
1180 // Note: This column name has a typo in the ClickHouse table.
1181 #[serde(rename = "sesssion_id")]
1182 session_id: Option<String>,
1183 is_staff: Option<bool>,
1184 time: i64,
1185 // ActionEventRow
1186 source: String,
1187 action: String,
1188}
1189
1190impl ActionEventRow {
1191 fn from_event(
1192 event: ActionEvent,
1193 wrapper: &EventWrapper,
1194 body: &EventRequestBody,
1195 first_event_at: chrono::DateTime<chrono::Utc>,
1196 ) -> Self {
1197 let semver = body.semver();
1198 let time =
1199 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1200
1201 Self {
1202 app_version: body.app_version.clone(),
1203 major: semver.map(|v| v.major() as i32),
1204 minor: semver.map(|v| v.minor() as i32),
1205 patch: semver.map(|v| v.patch() as i32),
1206 release_channel: body.release_channel.clone().unwrap_or_default(),
1207 installation_id: body.installation_id.clone(),
1208 session_id: body.session_id.clone(),
1209 is_staff: body.is_staff,
1210 time: time.timestamp_millis(),
1211 source: event.source,
1212 action: event.action,
1213 }
1214 }
1215}
1216
1217pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1218 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
1219 return None;
1220 };
1221
1222 let mut summer = Sha256::new();
1223 summer.update(checksum_seed);
1224 summer.update(&json);
1225 summer.update(checksum_seed);
1226 Some(summer.finalize().into_iter().collect())
1227}