1use super::ips_file::IpsFile;
2use crate::{api::slack, AppState, Error, Result};
3use anyhow::{anyhow, Context};
4use aws_sdk_s3::primitives::ByteStream;
5use axum::{
6 body::Bytes,
7 headers::Header,
8 http::{HeaderMap, HeaderName, StatusCode},
9 routing::post,
10 Extension, Router, TypedHeader,
11};
12use rpc::ExtensionMetadata;
13use semantic_version::SemanticVersion;
14use serde::{Serialize, Serializer};
15use sha2::{Digest, Sha256};
16use std::sync::{Arc, OnceLock};
17use telemetry_events::{
18 ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
19 EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent, SettingEvent,
20};
21use uuid::Uuid;
22
23static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
24
25pub fn router() -> Router {
26 Router::new()
27 .route("/telemetry/events", post(post_events))
28 .route("/telemetry/crashes", post(post_crash))
29 .route("/telemetry/hangs", post(post_hang))
30}
31
32pub struct ZedChecksumHeader(Vec<u8>);
33
34impl Header for ZedChecksumHeader {
35 fn name() -> &'static HeaderName {
36 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
37 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
38 }
39
40 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
41 where
42 Self: Sized,
43 I: Iterator<Item = &'i axum::http::HeaderValue>,
44 {
45 let checksum = values
46 .next()
47 .ok_or_else(axum::headers::Error::invalid)?
48 .to_str()
49 .map_err(|_| axum::headers::Error::invalid())?;
50
51 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
52 Ok(Self(bytes))
53 }
54
55 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
56 unimplemented!()
57 }
58}
59
60pub struct CloudflareIpCountryHeader(String);
61
62impl Header for CloudflareIpCountryHeader {
63 fn name() -> &'static HeaderName {
64 static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
65 CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
66 }
67
68 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
69 where
70 Self: Sized,
71 I: Iterator<Item = &'i axum::http::HeaderValue>,
72 {
73 let country_code = values
74 .next()
75 .ok_or_else(axum::headers::Error::invalid)?
76 .to_str()
77 .map_err(|_| axum::headers::Error::invalid())?;
78
79 Ok(Self(country_code.to_string()))
80 }
81
82 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
83 unimplemented!()
84 }
85}
86
87pub async fn post_crash(
88 Extension(app): Extension<Arc<AppState>>,
89 headers: HeaderMap,
90 body: Bytes,
91) -> Result<()> {
92 let report = IpsFile::parse(&body)?;
93 let version_threshold = SemanticVersion::new(0, 123, 0);
94
95 let bundle_id = &report.header.bundle_id;
96 let app_version = &report.app_version();
97
98 if bundle_id == "dev.zed.Zed-Dev" {
99 log::error!("Crash uploads from {} are ignored.", bundle_id);
100 return Ok(());
101 }
102
103 if app_version.is_none() || app_version.unwrap() < version_threshold {
104 log::error!(
105 "Crash uploads from {} are ignored.",
106 report.header.app_version
107 );
108 return Ok(());
109 }
110 let app_version = app_version.unwrap();
111
112 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
113 let response = blob_store_client
114 .head_object()
115 .bucket(CRASH_REPORTS_BUCKET)
116 .key(report.header.incident_id.clone() + ".ips")
117 .send()
118 .await;
119
120 if response.is_ok() {
121 log::info!("We've already uploaded this crash");
122 return Ok(());
123 }
124
125 blob_store_client
126 .put_object()
127 .bucket(CRASH_REPORTS_BUCKET)
128 .key(report.header.incident_id.clone() + ".ips")
129 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
130 .body(ByteStream::from(body.to_vec()))
131 .send()
132 .await
133 .map_err(|e| log::error!("Failed to upload crash: {}", e))
134 .ok();
135 }
136
137 let recent_panic_on: Option<i64> = headers
138 .get("x-zed-panicked-on")
139 .and_then(|h| h.to_str().ok())
140 .and_then(|s| s.parse().ok());
141
142 let installation_id = headers
143 .get("x-zed-installation-id")
144 .and_then(|h| h.to_str().ok())
145 .map(|s| s.to_string())
146 .unwrap_or_default();
147
148 let mut recent_panic = None;
149
150 if let Some(recent_panic_on) = recent_panic_on {
151 let crashed_at = match report.timestamp() {
152 Ok(t) => Some(t),
153 Err(e) => {
154 log::error!("Can't parse {}: {}", report.header.timestamp, e);
155 None
156 }
157 };
158 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
159 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
160 }
161 }
162
163 let description = report.description(recent_panic);
164 let summary = report.backtrace_summary();
165
166 tracing::error!(
167 service = "client",
168 version = %report.header.app_version,
169 os_version = %report.header.os_version,
170 bundle_id = %report.header.bundle_id,
171 incident_id = %report.header.incident_id,
172 installation_id = %installation_id,
173 description = %description,
174 backtrace = %summary,
175 "crash report");
176
177 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
178 let payload = slack::WebhookBody::new(|w| {
179 w.add_section(|s| s.text(slack::Text::markdown(description)))
180 .add_section(|s| {
181 s.add_field(slack::Text::markdown(format!(
182 "*Version:*\n{} ({})",
183 bundle_id, app_version
184 )))
185 .add_field({
186 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
187 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
188 hostname.strip_prefix("http://").unwrap_or_default()
189 });
190
191 slack::Text::markdown(format!(
192 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
193 CRASH_REPORTS_BUCKET,
194 hostname,
195 report.header.incident_id,
196 report
197 .header
198 .incident_id
199 .chars()
200 .take(8)
201 .collect::<String>(),
202 ))
203 })
204 })
205 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
206 });
207 let payload_json = serde_json::to_string(&payload).map_err(|err| {
208 log::error!("Failed to serialize payload to JSON: {err}");
209 Error::Internal(anyhow!(err))
210 })?;
211
212 reqwest::Client::new()
213 .post(slack_panics_webhook)
214 .header("Content-Type", "application/json")
215 .body(payload_json)
216 .send()
217 .await
218 .map_err(|err| {
219 log::error!("Failed to send payload to Slack: {err}");
220 Error::Internal(anyhow!(err))
221 })?;
222 }
223
224 Ok(())
225}
226
227pub async fn post_hang(
228 Extension(app): Extension<Arc<AppState>>,
229 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
230 body: Bytes,
231) -> Result<()> {
232 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
233 return Err(Error::Http(
234 StatusCode::INTERNAL_SERVER_ERROR,
235 "events not enabled".into(),
236 ))?;
237 };
238
239 if checksum != expected {
240 return Err(Error::Http(
241 StatusCode::BAD_REQUEST,
242 "invalid checksum".into(),
243 ))?;
244 }
245
246 let incident_id = Uuid::new_v4().to_string();
247
248 // dump JSON into S3 so we can get frame offsets if we need to.
249 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
250 blob_store_client
251 .put_object()
252 .bucket(CRASH_REPORTS_BUCKET)
253 .key(incident_id.clone() + ".hang.json")
254 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
255 .body(ByteStream::from(body.to_vec()))
256 .send()
257 .await
258 .map_err(|e| log::error!("Failed to upload crash: {}", e))
259 .ok();
260 }
261
262 let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
263 log::error!("can't parse report json: {err}");
264 Error::Internal(anyhow!(err))
265 })?;
266
267 let mut backtrace = "Possible hang detected on main thread:".to_string();
268 let unknown = "<unknown>".to_string();
269 for frame in report.backtrace.iter() {
270 backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
271 }
272
273 tracing::error!(
274 service = "client",
275 version = %report.app_version.unwrap_or_default().to_string(),
276 os_name = %report.os_name,
277 os_version = report.os_version.unwrap_or_default().to_string(),
278 incident_id = %incident_id,
279 installation_id = %report.installation_id.unwrap_or_default(),
280 backtrace = %backtrace,
281 "hang report");
282
283 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
284 let payload = slack::WebhookBody::new(|w| {
285 w.add_section(|s| s.text(slack::Text::markdown("Possible Hang".to_string())))
286 .add_section(|s| {
287 s.add_field(slack::Text::markdown(format!(
288 "*Version:*\n {} ",
289 report.app_version.unwrap_or_default()
290 )))
291 .add_field({
292 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
293 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
294 hostname.strip_prefix("http://").unwrap_or_default()
295 });
296
297 slack::Text::markdown(format!(
298 "*Incident:*\n<https://{}.{}/{}.hang.json|{}…>",
299 CRASH_REPORTS_BUCKET,
300 hostname,
301 incident_id,
302 incident_id.chars().take(8).collect::<String>(),
303 ))
304 })
305 })
306 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace)))
307 });
308 let payload_json = serde_json::to_string(&payload).map_err(|err| {
309 log::error!("Failed to serialize payload to JSON: {err}");
310 Error::Internal(anyhow!(err))
311 })?;
312
313 reqwest::Client::new()
314 .post(slack_panics_webhook)
315 .header("Content-Type", "application/json")
316 .body(payload_json)
317 .send()
318 .await
319 .map_err(|err| {
320 log::error!("Failed to send payload to Slack: {err}");
321 Error::Internal(anyhow!(err))
322 })?;
323 }
324
325 Ok(())
326}
327
328pub async fn post_events(
329 Extension(app): Extension<Arc<AppState>>,
330 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
331 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
332 body: Bytes,
333) -> Result<()> {
334 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
335 Err(Error::Http(
336 StatusCode::NOT_IMPLEMENTED,
337 "not supported".into(),
338 ))?
339 };
340
341 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
342 return Err(Error::Http(
343 StatusCode::INTERNAL_SERVER_ERROR,
344 "events not enabled".into(),
345 ))?;
346 };
347
348 if checksum != expected {
349 return Err(Error::Http(
350 StatusCode::BAD_REQUEST,
351 "invalid checksum".into(),
352 ))?;
353 }
354
355 let request_body: telemetry_events::EventRequestBody =
356 serde_json::from_slice(&body).map_err(|err| {
357 log::error!("can't parse event json: {err}");
358 Error::Internal(anyhow!(err))
359 })?;
360
361 let mut to_upload = ToUpload::default();
362 let Some(last_event) = request_body.events.last() else {
363 return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
364 };
365 let country_code = country_code_header.map(|h| h.0 .0);
366
367 let first_event_at = chrono::Utc::now()
368 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
369
370 for wrapper in &request_body.events {
371 match &wrapper.event {
372 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
373 event.clone(),
374 &wrapper,
375 &request_body,
376 first_event_at,
377 country_code.clone(),
378 )),
379 Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
380 event.clone(),
381 &wrapper,
382 &request_body,
383 first_event_at,
384 country_code.clone(),
385 )),
386 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
387 event.clone(),
388 &wrapper,
389 &request_body,
390 first_event_at,
391 )),
392 Event::Assistant(event) => {
393 to_upload
394 .assistant_events
395 .push(AssistantEventRow::from_event(
396 event.clone(),
397 &wrapper,
398 &request_body,
399 first_event_at,
400 ))
401 }
402 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
403 event.clone(),
404 &wrapper,
405 &request_body,
406 first_event_at,
407 )),
408 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
409 event.clone(),
410 &wrapper,
411 &request_body,
412 first_event_at,
413 )),
414 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
415 event.clone(),
416 &wrapper,
417 &request_body,
418 first_event_at,
419 )),
420 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
421 event.clone(),
422 &wrapper,
423 &request_body,
424 first_event_at,
425 )),
426 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
427 event.clone(),
428 &wrapper,
429 &request_body,
430 first_event_at,
431 )),
432 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
433 event.clone(),
434 &wrapper,
435 &request_body,
436 first_event_at,
437 )),
438 Event::Extension(event) => {
439 let metadata = app
440 .db
441 .get_extension_version(&event.extension_id, &event.version)
442 .await?;
443 to_upload
444 .extension_events
445 .push(ExtensionEventRow::from_event(
446 event.clone(),
447 &wrapper,
448 &request_body,
449 metadata,
450 first_event_at,
451 ))
452 }
453 }
454 }
455
456 to_upload
457 .upload(&clickhouse_client)
458 .await
459 .map_err(|err| Error::Internal(anyhow!(err)))?;
460
461 Ok(())
462}
463
464#[derive(Default)]
465struct ToUpload {
466 editor_events: Vec<EditorEventRow>,
467 copilot_events: Vec<CopilotEventRow>,
468 assistant_events: Vec<AssistantEventRow>,
469 call_events: Vec<CallEventRow>,
470 cpu_events: Vec<CpuEventRow>,
471 memory_events: Vec<MemoryEventRow>,
472 app_events: Vec<AppEventRow>,
473 setting_events: Vec<SettingEventRow>,
474 extension_events: Vec<ExtensionEventRow>,
475 edit_events: Vec<EditEventRow>,
476 action_events: Vec<ActionEventRow>,
477}
478
479impl ToUpload {
480 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
481 const EDITOR_EVENTS_TABLE: &str = "editor_events";
482 Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
483 .await
484 .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
485
486 const COPILOT_EVENTS_TABLE: &str = "copilot_events";
487 Self::upload_to_table(
488 COPILOT_EVENTS_TABLE,
489 &self.copilot_events,
490 clickhouse_client,
491 )
492 .await
493 .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
494
495 const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
496 Self::upload_to_table(
497 ASSISTANT_EVENTS_TABLE,
498 &self.assistant_events,
499 clickhouse_client,
500 )
501 .await
502 .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
503
504 const CALL_EVENTS_TABLE: &str = "call_events";
505 Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
506 .await
507 .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
508
509 const CPU_EVENTS_TABLE: &str = "cpu_events";
510 Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
511 .await
512 .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
513
514 const MEMORY_EVENTS_TABLE: &str = "memory_events";
515 Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
516 .await
517 .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
518
519 const APP_EVENTS_TABLE: &str = "app_events";
520 Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
521 .await
522 .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
523
524 const SETTING_EVENTS_TABLE: &str = "setting_events";
525 Self::upload_to_table(
526 SETTING_EVENTS_TABLE,
527 &self.setting_events,
528 clickhouse_client,
529 )
530 .await
531 .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
532
533 const EXTENSION_EVENTS_TABLE: &str = "extension_events";
534 Self::upload_to_table(
535 EXTENSION_EVENTS_TABLE,
536 &self.extension_events,
537 clickhouse_client,
538 )
539 .await
540 .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
541
542 const EDIT_EVENTS_TABLE: &str = "edit_events";
543 Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
544 .await
545 .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
546
547 const ACTION_EVENTS_TABLE: &str = "action_events";
548 Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
549 .await
550 .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
551
552 Ok(())
553 }
554
555 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
556 table: &str,
557 rows: &[T],
558 clickhouse_client: &clickhouse::Client,
559 ) -> anyhow::Result<()> {
560 if !rows.is_empty() {
561 let mut insert = clickhouse_client.insert(table)?;
562
563 for event in rows {
564 insert.write(event).await?;
565 }
566
567 insert.end().await?;
568
569 let event_count = rows.len();
570 log::info!(
571 "wrote {event_count} {event_specifier} to '{table}'",
572 event_specifier = if event_count == 1 { "event" } else { "events" }
573 );
574 }
575
576 Ok(())
577 }
578}
579
580pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
581where
582 S: Serializer,
583{
584 if country_code.len() != 2 {
585 use serde::ser::Error;
586 return Err(S::Error::custom(
587 "country_code must be exactly 2 characters",
588 ));
589 }
590
591 let country_code = country_code.as_bytes();
592
593 serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
594}
595
596#[derive(Serialize, Debug, clickhouse::Row)]
597pub struct EditorEventRow {
598 pub installation_id: String,
599 pub operation: String,
600 pub app_version: String,
601 pub file_extension: String,
602 pub os_name: String,
603 pub os_version: String,
604 pub release_channel: String,
605 pub signed_in: bool,
606 pub vim_mode: bool,
607 #[serde(serialize_with = "serialize_country_code")]
608 pub country_code: String,
609 pub region_code: String,
610 pub city: String,
611 pub time: i64,
612 pub copilot_enabled: bool,
613 pub copilot_enabled_for_language: bool,
614 pub historical_event: bool,
615 pub architecture: String,
616 pub is_staff: Option<bool>,
617 pub session_id: Option<String>,
618 pub major: Option<i32>,
619 pub minor: Option<i32>,
620 pub patch: Option<i32>,
621}
622
623impl EditorEventRow {
624 fn from_event(
625 event: EditorEvent,
626 wrapper: &EventWrapper,
627 body: &EventRequestBody,
628 first_event_at: chrono::DateTime<chrono::Utc>,
629 country_code: Option<String>,
630 ) -> Self {
631 let semver = body.semver();
632 let time =
633 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
634
635 Self {
636 app_version: body.app_version.clone(),
637 major: semver.map(|v| v.major() as i32),
638 minor: semver.map(|v| v.minor() as i32),
639 patch: semver.map(|v| v.patch() as i32),
640 release_channel: body.release_channel.clone().unwrap_or_default(),
641 os_name: body.os_name.clone(),
642 os_version: body.os_version.clone().unwrap_or_default(),
643 architecture: body.architecture.clone(),
644 installation_id: body.installation_id.clone().unwrap_or_default(),
645 session_id: body.session_id.clone(),
646 is_staff: body.is_staff,
647 time: time.timestamp_millis(),
648 operation: event.operation,
649 file_extension: event.file_extension.unwrap_or_default(),
650 signed_in: wrapper.signed_in,
651 vim_mode: event.vim_mode,
652 copilot_enabled: event.copilot_enabled,
653 copilot_enabled_for_language: event.copilot_enabled_for_language,
654 country_code: country_code.unwrap_or("XX".to_string()),
655 region_code: "".to_string(),
656 city: "".to_string(),
657 historical_event: false,
658 }
659 }
660}
661
662#[derive(Serialize, Debug, clickhouse::Row)]
663pub struct CopilotEventRow {
664 pub installation_id: String,
665 pub suggestion_id: String,
666 pub suggestion_accepted: bool,
667 pub app_version: String,
668 pub file_extension: String,
669 pub os_name: String,
670 pub os_version: String,
671 pub release_channel: String,
672 pub signed_in: bool,
673 #[serde(serialize_with = "serialize_country_code")]
674 pub country_code: String,
675 pub region_code: String,
676 pub city: String,
677 pub time: i64,
678 pub is_staff: Option<bool>,
679 pub session_id: Option<String>,
680 pub major: Option<i32>,
681 pub minor: Option<i32>,
682 pub patch: Option<i32>,
683}
684
685impl CopilotEventRow {
686 fn from_event(
687 event: CopilotEvent,
688 wrapper: &EventWrapper,
689 body: &EventRequestBody,
690 first_event_at: chrono::DateTime<chrono::Utc>,
691 country_code: Option<String>,
692 ) -> Self {
693 let semver = body.semver();
694 let time =
695 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
696
697 Self {
698 app_version: body.app_version.clone(),
699 major: semver.map(|v| v.major() as i32),
700 minor: semver.map(|v| v.minor() as i32),
701 patch: semver.map(|v| v.patch() as i32),
702 release_channel: body.release_channel.clone().unwrap_or_default(),
703 os_name: body.os_name.clone(),
704 os_version: body.os_version.clone().unwrap_or_default(),
705 installation_id: body.installation_id.clone().unwrap_or_default(),
706 session_id: body.session_id.clone(),
707 is_staff: body.is_staff,
708 time: time.timestamp_millis(),
709 file_extension: event.file_extension.unwrap_or_default(),
710 signed_in: wrapper.signed_in,
711 country_code: country_code.unwrap_or("XX".to_string()),
712 region_code: "".to_string(),
713 city: "".to_string(),
714 suggestion_id: event.suggestion_id.unwrap_or_default(),
715 suggestion_accepted: event.suggestion_accepted,
716 }
717 }
718}
719
720#[derive(Serialize, Debug, clickhouse::Row)]
721pub struct CallEventRow {
722 // AppInfoBase
723 app_version: String,
724 major: Option<i32>,
725 minor: Option<i32>,
726 patch: Option<i32>,
727 release_channel: String,
728
729 // ClientEventBase
730 installation_id: String,
731 session_id: Option<String>,
732 is_staff: Option<bool>,
733 time: i64,
734
735 // CallEventRow
736 operation: String,
737 room_id: Option<u64>,
738 channel_id: Option<u64>,
739}
740
741impl CallEventRow {
742 fn from_event(
743 event: CallEvent,
744 wrapper: &EventWrapper,
745 body: &EventRequestBody,
746 first_event_at: chrono::DateTime<chrono::Utc>,
747 ) -> Self {
748 let semver = body.semver();
749 let time =
750 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
751
752 Self {
753 app_version: body.app_version.clone(),
754 major: semver.map(|v| v.major() as i32),
755 minor: semver.map(|v| v.minor() as i32),
756 patch: semver.map(|v| v.patch() as i32),
757 release_channel: body.release_channel.clone().unwrap_or_default(),
758 installation_id: body.installation_id.clone().unwrap_or_default(),
759 session_id: body.session_id.clone(),
760 is_staff: body.is_staff,
761 time: time.timestamp_millis(),
762 operation: event.operation,
763 room_id: event.room_id,
764 channel_id: event.channel_id,
765 }
766 }
767}
768
769#[derive(Serialize, Debug, clickhouse::Row)]
770pub struct AssistantEventRow {
771 // AppInfoBase
772 app_version: String,
773 major: Option<i32>,
774 minor: Option<i32>,
775 patch: Option<i32>,
776 release_channel: String,
777
778 // ClientEventBase
779 installation_id: Option<String>,
780 session_id: Option<String>,
781 is_staff: Option<bool>,
782 time: i64,
783
784 // AssistantEventRow
785 conversation_id: String,
786 kind: String,
787 model: String,
788 response_latency_in_ms: Option<i64>,
789 error_message: Option<String>,
790}
791
792impl AssistantEventRow {
793 fn from_event(
794 event: AssistantEvent,
795 wrapper: &EventWrapper,
796 body: &EventRequestBody,
797 first_event_at: chrono::DateTime<chrono::Utc>,
798 ) -> Self {
799 let semver = body.semver();
800 let time =
801 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
802
803 Self {
804 app_version: body.app_version.clone(),
805 major: semver.map(|v| v.major() as i32),
806 minor: semver.map(|v| v.minor() as i32),
807 patch: semver.map(|v| v.patch() as i32),
808 release_channel: body.release_channel.clone().unwrap_or_default(),
809 installation_id: body.installation_id.clone(),
810 session_id: body.session_id.clone(),
811 is_staff: body.is_staff,
812 time: time.timestamp_millis(),
813 conversation_id: event.conversation_id.unwrap_or_default(),
814 kind: event.kind.to_string(),
815 model: event.model,
816 response_latency_in_ms: event
817 .response_latency
818 .map(|latency| latency.as_millis() as i64),
819 error_message: event.error_message,
820 }
821 }
822}
823
824#[derive(Debug, clickhouse::Row, Serialize)]
825pub struct CpuEventRow {
826 pub installation_id: Option<String>,
827 pub is_staff: Option<bool>,
828 pub usage_as_percentage: f32,
829 pub core_count: u32,
830 pub app_version: String,
831 pub release_channel: String,
832 pub time: i64,
833 pub session_id: Option<String>,
834 // pub normalized_cpu_usage: f64, MATERIALIZED
835 pub major: Option<i32>,
836 pub minor: Option<i32>,
837 pub patch: Option<i32>,
838}
839
840impl CpuEventRow {
841 fn from_event(
842 event: CpuEvent,
843 wrapper: &EventWrapper,
844 body: &EventRequestBody,
845 first_event_at: chrono::DateTime<chrono::Utc>,
846 ) -> Self {
847 let semver = body.semver();
848 let time =
849 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
850
851 Self {
852 app_version: body.app_version.clone(),
853 major: semver.map(|v| v.major() as i32),
854 minor: semver.map(|v| v.minor() as i32),
855 patch: semver.map(|v| v.patch() as i32),
856 release_channel: body.release_channel.clone().unwrap_or_default(),
857 installation_id: body.installation_id.clone(),
858 session_id: body.session_id.clone(),
859 is_staff: body.is_staff,
860 time: time.timestamp_millis(),
861 usage_as_percentage: event.usage_as_percentage,
862 core_count: event.core_count,
863 }
864 }
865}
866
867#[derive(Serialize, Debug, clickhouse::Row)]
868pub struct MemoryEventRow {
869 // AppInfoBase
870 app_version: String,
871 major: Option<i32>,
872 minor: Option<i32>,
873 patch: Option<i32>,
874 release_channel: String,
875
876 // ClientEventBase
877 installation_id: Option<String>,
878 session_id: Option<String>,
879 is_staff: Option<bool>,
880 time: i64,
881
882 // MemoryEventRow
883 memory_in_bytes: u64,
884 virtual_memory_in_bytes: u64,
885}
886
887impl MemoryEventRow {
888 fn from_event(
889 event: MemoryEvent,
890 wrapper: &EventWrapper,
891 body: &EventRequestBody,
892 first_event_at: chrono::DateTime<chrono::Utc>,
893 ) -> Self {
894 let semver = body.semver();
895 let time =
896 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
897
898 Self {
899 app_version: body.app_version.clone(),
900 major: semver.map(|v| v.major() as i32),
901 minor: semver.map(|v| v.minor() as i32),
902 patch: semver.map(|v| v.patch() as i32),
903 release_channel: body.release_channel.clone().unwrap_or_default(),
904 installation_id: body.installation_id.clone(),
905 session_id: body.session_id.clone(),
906 is_staff: body.is_staff,
907 time: time.timestamp_millis(),
908 memory_in_bytes: event.memory_in_bytes,
909 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
910 }
911 }
912}
913
914#[derive(Serialize, Debug, clickhouse::Row)]
915pub struct AppEventRow {
916 // AppInfoBase
917 app_version: String,
918 major: Option<i32>,
919 minor: Option<i32>,
920 patch: Option<i32>,
921 release_channel: String,
922
923 // ClientEventBase
924 installation_id: Option<String>,
925 session_id: Option<String>,
926 is_staff: Option<bool>,
927 time: i64,
928
929 // AppEventRow
930 operation: String,
931}
932
933impl AppEventRow {
934 fn from_event(
935 event: AppEvent,
936 wrapper: &EventWrapper,
937 body: &EventRequestBody,
938 first_event_at: chrono::DateTime<chrono::Utc>,
939 ) -> Self {
940 let semver = body.semver();
941 let time =
942 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
943
944 Self {
945 app_version: body.app_version.clone(),
946 major: semver.map(|v| v.major() as i32),
947 minor: semver.map(|v| v.minor() as i32),
948 patch: semver.map(|v| v.patch() as i32),
949 release_channel: body.release_channel.clone().unwrap_or_default(),
950 installation_id: body.installation_id.clone(),
951 session_id: body.session_id.clone(),
952 is_staff: body.is_staff,
953 time: time.timestamp_millis(),
954 operation: event.operation,
955 }
956 }
957}
958
959#[derive(Serialize, Debug, clickhouse::Row)]
960pub struct SettingEventRow {
961 // AppInfoBase
962 app_version: String,
963 major: Option<i32>,
964 minor: Option<i32>,
965 patch: Option<i32>,
966 release_channel: String,
967
968 // ClientEventBase
969 installation_id: Option<String>,
970 session_id: Option<String>,
971 is_staff: Option<bool>,
972 time: i64,
973 // SettingEventRow
974 setting: String,
975 value: String,
976}
977
978impl SettingEventRow {
979 fn from_event(
980 event: SettingEvent,
981 wrapper: &EventWrapper,
982 body: &EventRequestBody,
983 first_event_at: chrono::DateTime<chrono::Utc>,
984 ) -> Self {
985 let semver = body.semver();
986 let time =
987 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
988
989 Self {
990 app_version: body.app_version.clone(),
991 major: semver.map(|v| v.major() as i32),
992 minor: semver.map(|v| v.minor() as i32),
993 patch: semver.map(|v| v.patch() as i32),
994 release_channel: body.release_channel.clone().unwrap_or_default(),
995 installation_id: body.installation_id.clone(),
996 session_id: body.session_id.clone(),
997 is_staff: body.is_staff,
998 time: time.timestamp_millis(),
999 setting: event.setting,
1000 value: event.value,
1001 }
1002 }
1003}
1004
1005#[derive(Serialize, Debug, clickhouse::Row)]
1006pub struct ExtensionEventRow {
1007 // AppInfoBase
1008 app_version: String,
1009 major: Option<i32>,
1010 minor: Option<i32>,
1011 patch: Option<i32>,
1012 release_channel: String,
1013
1014 // ClientEventBase
1015 installation_id: Option<String>,
1016 session_id: Option<String>,
1017 is_staff: Option<bool>,
1018 time: i64,
1019
1020 // ExtensionEventRow
1021 extension_id: Arc<str>,
1022 extension_version: Arc<str>,
1023 dev: bool,
1024 schema_version: Option<i32>,
1025 wasm_api_version: Option<String>,
1026}
1027
1028impl ExtensionEventRow {
1029 fn from_event(
1030 event: ExtensionEvent,
1031 wrapper: &EventWrapper,
1032 body: &EventRequestBody,
1033 extension_metadata: Option<ExtensionMetadata>,
1034 first_event_at: chrono::DateTime<chrono::Utc>,
1035 ) -> Self {
1036 let semver = body.semver();
1037 let time =
1038 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1039
1040 Self {
1041 app_version: body.app_version.clone(),
1042 major: semver.map(|v| v.major() as i32),
1043 minor: semver.map(|v| v.minor() as i32),
1044 patch: semver.map(|v| v.patch() as i32),
1045 release_channel: body.release_channel.clone().unwrap_or_default(),
1046 installation_id: body.installation_id.clone(),
1047 session_id: body.session_id.clone(),
1048 is_staff: body.is_staff,
1049 time: time.timestamp_millis(),
1050 extension_id: event.extension_id,
1051 extension_version: event.version,
1052 dev: extension_metadata.is_none(),
1053 schema_version: extension_metadata
1054 .as_ref()
1055 .and_then(|metadata| metadata.manifest.schema_version),
1056 wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1057 metadata
1058 .manifest
1059 .wasm_api_version
1060 .as_ref()
1061 .map(|version| version.to_string())
1062 }),
1063 }
1064 }
1065}
1066
1067#[derive(Serialize, Debug, clickhouse::Row)]
1068pub struct EditEventRow {
1069 // AppInfoBase
1070 app_version: String,
1071 major: Option<i32>,
1072 minor: Option<i32>,
1073 patch: Option<i32>,
1074 release_channel: String,
1075
1076 // ClientEventBase
1077 installation_id: Option<String>,
1078 // Note: This column name has a typo in the ClickHouse table.
1079 #[serde(rename = "sesssion_id")]
1080 session_id: Option<String>,
1081 is_staff: Option<bool>,
1082 time: i64,
1083
1084 // EditEventRow
1085 period_start: i64,
1086 period_end: i64,
1087 environment: String,
1088}
1089
1090impl EditEventRow {
1091 fn from_event(
1092 event: EditEvent,
1093 wrapper: &EventWrapper,
1094 body: &EventRequestBody,
1095 first_event_at: chrono::DateTime<chrono::Utc>,
1096 ) -> Self {
1097 let semver = body.semver();
1098 let time =
1099 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1100
1101 let period_start = time - chrono::Duration::milliseconds(event.duration);
1102 let period_end = time;
1103
1104 Self {
1105 app_version: body.app_version.clone(),
1106 major: semver.map(|v| v.major() as i32),
1107 minor: semver.map(|v| v.minor() as i32),
1108 patch: semver.map(|v| v.patch() as i32),
1109 release_channel: body.release_channel.clone().unwrap_or_default(),
1110 installation_id: body.installation_id.clone(),
1111 session_id: body.session_id.clone(),
1112 is_staff: body.is_staff,
1113 time: time.timestamp_millis(),
1114 period_start: period_start.timestamp_millis(),
1115 period_end: period_end.timestamp_millis(),
1116 environment: event.environment,
1117 }
1118 }
1119}
1120
1121#[derive(Serialize, Debug, clickhouse::Row)]
1122pub struct ActionEventRow {
1123 // AppInfoBase
1124 app_version: String,
1125 major: Option<i32>,
1126 minor: Option<i32>,
1127 patch: Option<i32>,
1128 release_channel: String,
1129
1130 // ClientEventBase
1131 installation_id: Option<String>,
1132 // Note: This column name has a typo in the ClickHouse table.
1133 #[serde(rename = "sesssion_id")]
1134 session_id: Option<String>,
1135 is_staff: Option<bool>,
1136 time: i64,
1137 // ActionEventRow
1138 source: String,
1139 action: String,
1140}
1141
1142impl ActionEventRow {
1143 fn from_event(
1144 event: ActionEvent,
1145 wrapper: &EventWrapper,
1146 body: &EventRequestBody,
1147 first_event_at: chrono::DateTime<chrono::Utc>,
1148 ) -> Self {
1149 let semver = body.semver();
1150 let time =
1151 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1152
1153 Self {
1154 app_version: body.app_version.clone(),
1155 major: semver.map(|v| v.major() as i32),
1156 minor: semver.map(|v| v.minor() as i32),
1157 patch: semver.map(|v| v.patch() as i32),
1158 release_channel: body.release_channel.clone().unwrap_or_default(),
1159 installation_id: body.installation_id.clone(),
1160 session_id: body.session_id.clone(),
1161 is_staff: body.is_staff,
1162 time: time.timestamp_millis(),
1163 source: event.source,
1164 action: event.action,
1165 }
1166 }
1167}
1168
1169pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1170 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
1171 return None;
1172 };
1173
1174 let mut summer = Sha256::new();
1175 summer.update(checksum_seed);
1176 summer.update(&json);
1177 summer.update(checksum_seed);
1178 Some(summer.finalize().into_iter().collect())
1179}