1use super::ips_file::IpsFile;
2use crate::api::CloudflareIpCountryHeader;
3use crate::{AppState, Error, Result, api::slack};
4use anyhow::anyhow;
5use aws_sdk_s3::primitives::ByteStream;
6use axum::{
7 Extension, Router, TypedHeader,
8 body::Bytes,
9 headers::Header,
10 http::{HeaderMap, HeaderName, StatusCode},
11 routing::post,
12};
13use chrono::Duration;
14use semantic_version::SemanticVersion;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use sha2::{Digest, Sha256};
18use std::sync::{Arc, OnceLock};
19use telemetry_events::{Event, EventRequestBody, Panic};
20use util::ResultExt;
21use uuid::Uuid;
22
23const CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
24
25pub fn router() -> Router {
26 Router::new()
27 .route("/telemetry/events", post(post_events))
28 .route("/telemetry/crashes", post(post_crash))
29 .route("/telemetry/panics", post(post_panic))
30 .route("/telemetry/hangs", post(post_hang))
31}
32
33pub struct ZedChecksumHeader(Vec<u8>);
34
35impl Header for ZedChecksumHeader {
36 fn name() -> &'static HeaderName {
37 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
38 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
39 }
40
41 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
42 where
43 Self: Sized,
44 I: Iterator<Item = &'i axum::http::HeaderValue>,
45 {
46 let checksum = values
47 .next()
48 .ok_or_else(axum::headers::Error::invalid)?
49 .to_str()
50 .map_err(|_| axum::headers::Error::invalid())?;
51
52 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
53 Ok(Self(bytes))
54 }
55
56 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
57 unimplemented!()
58 }
59}
60
61pub async fn post_crash(
62 Extension(app): Extension<Arc<AppState>>,
63 headers: HeaderMap,
64 body: Bytes,
65) -> Result<()> {
66 let report = IpsFile::parse(&body)?;
67 let version_threshold = SemanticVersion::new(0, 123, 0);
68
69 let bundle_id = &report.header.bundle_id;
70 let app_version = &report.app_version();
71
72 if bundle_id == "dev.zed.Zed-Dev" {
73 log::error!("Crash uploads from {} are ignored.", bundle_id);
74 return Ok(());
75 }
76
77 if app_version.is_none() || app_version.unwrap() < version_threshold {
78 log::error!(
79 "Crash uploads from {} are ignored.",
80 report.header.app_version
81 );
82 return Ok(());
83 }
84 let app_version = app_version.unwrap();
85
86 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
87 let response = blob_store_client
88 .head_object()
89 .bucket(CRASH_REPORTS_BUCKET)
90 .key(report.header.incident_id.clone() + ".ips")
91 .send()
92 .await;
93
94 if response.is_ok() {
95 log::info!("We've already uploaded this crash");
96 return Ok(());
97 }
98
99 blob_store_client
100 .put_object()
101 .bucket(CRASH_REPORTS_BUCKET)
102 .key(report.header.incident_id.clone() + ".ips")
103 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
104 .body(ByteStream::from(body.to_vec()))
105 .send()
106 .await
107 .map_err(|e| log::error!("Failed to upload crash: {}", e))
108 .ok();
109 }
110
111 let recent_panic_on: Option<i64> = headers
112 .get("x-zed-panicked-on")
113 .and_then(|h| h.to_str().ok())
114 .and_then(|s| s.parse().ok());
115
116 let installation_id = headers
117 .get("x-zed-installation-id")
118 .and_then(|h| h.to_str().ok())
119 .map(|s| s.to_string())
120 .unwrap_or_default();
121
122 let mut recent_panic = None;
123
124 if let Some(recent_panic_on) = recent_panic_on {
125 let crashed_at = match report.timestamp() {
126 Ok(t) => Some(t),
127 Err(e) => {
128 log::error!("Can't parse {}: {}", report.header.timestamp, e);
129 None
130 }
131 };
132 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
133 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
134 }
135 }
136
137 let description = report.description(recent_panic);
138 let summary = report.backtrace_summary();
139
140 tracing::error!(
141 service = "client",
142 version = %report.header.app_version,
143 os_version = %report.header.os_version,
144 bundle_id = %report.header.bundle_id,
145 incident_id = %report.header.incident_id,
146 installation_id = %installation_id,
147 description = %description,
148 backtrace = %summary,
149 "crash report"
150 );
151
152 if let Some(kinesis_client) = app.kinesis_client.clone()
153 && let Some(stream) = app.config.kinesis_stream.clone() {
154 let properties = json!({
155 "app_version": report.header.app_version,
156 "os_version": report.header.os_version,
157 "os_name": "macOS",
158 "bundle_id": report.header.bundle_id,
159 "incident_id": report.header.incident_id,
160 "installation_id": installation_id,
161 "description": description,
162 "backtrace": summary,
163 });
164 let row = SnowflakeRow::new(
165 "Crash Reported",
166 None,
167 false,
168 Some(installation_id),
169 properties,
170 );
171 let data = serde_json::to_vec(&row)?;
172 kinesis_client
173 .put_record()
174 .stream_name(stream)
175 .partition_key(row.insert_id.unwrap_or_default())
176 .data(data.into())
177 .send()
178 .await
179 .log_err();
180 }
181
182 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
183 let payload = slack::WebhookBody::new(|w| {
184 w.add_section(|s| s.text(slack::Text::markdown(description)))
185 .add_section(|s| {
186 s.add_field(slack::Text::markdown(format!(
187 "*Version:*\n{} ({})",
188 bundle_id, app_version
189 )))
190 .add_field({
191 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
192 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
193 hostname.strip_prefix("http://").unwrap_or_default()
194 });
195
196 slack::Text::markdown(format!(
197 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
198 CRASH_REPORTS_BUCKET,
199 hostname,
200 report.header.incident_id,
201 report
202 .header
203 .incident_id
204 .chars()
205 .take(8)
206 .collect::<String>(),
207 ))
208 })
209 })
210 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
211 });
212 let payload_json = serde_json::to_string(&payload).map_err(|err| {
213 log::error!("Failed to serialize payload to JSON: {err}");
214 Error::Internal(anyhow!(err))
215 })?;
216
217 reqwest::Client::new()
218 .post(slack_panics_webhook)
219 .header("Content-Type", "application/json")
220 .body(payload_json)
221 .send()
222 .await
223 .map_err(|err| {
224 log::error!("Failed to send payload to Slack: {err}");
225 Error::Internal(anyhow!(err))
226 })?;
227 }
228
229 Ok(())
230}
231
232pub async fn post_hang(
233 Extension(app): Extension<Arc<AppState>>,
234 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
235 body: Bytes,
236) -> Result<()> {
237 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
238 return Err(Error::http(
239 StatusCode::INTERNAL_SERVER_ERROR,
240 "events not enabled".into(),
241 ))?;
242 };
243
244 if checksum != expected {
245 return Err(Error::http(
246 StatusCode::BAD_REQUEST,
247 "invalid checksum".into(),
248 ))?;
249 }
250
251 let incident_id = Uuid::new_v4().to_string();
252
253 // dump JSON into S3 so we can get frame offsets if we need to.
254 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
255 blob_store_client
256 .put_object()
257 .bucket(CRASH_REPORTS_BUCKET)
258 .key(incident_id.clone() + ".hang.json")
259 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
260 .body(ByteStream::from(body.to_vec()))
261 .send()
262 .await
263 .map_err(|e| log::error!("Failed to upload crash: {}", e))
264 .ok();
265 }
266
267 let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
268 log::error!("can't parse report json: {err}");
269 Error::Internal(anyhow!(err))
270 })?;
271
272 let mut backtrace = "Possible hang detected on main thread:".to_string();
273 let unknown = "<unknown>".to_string();
274 for frame in report.backtrace.iter() {
275 backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
276 }
277
278 tracing::error!(
279 service = "client",
280 version = %report.app_version.unwrap_or_default().to_string(),
281 os_name = %report.os_name,
282 os_version = report.os_version.unwrap_or_default().to_string(),
283 incident_id = %incident_id,
284 installation_id = %report.installation_id.unwrap_or_default(),
285 backtrace = %backtrace,
286 "hang report");
287
288 Ok(())
289}
290
291pub async fn post_panic(
292 Extension(app): Extension<Arc<AppState>>,
293 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
294 body: Bytes,
295) -> Result<()> {
296 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
297 return Err(Error::http(
298 StatusCode::INTERNAL_SERVER_ERROR,
299 "events not enabled".into(),
300 ))?;
301 };
302
303 if checksum != expected {
304 return Err(Error::http(
305 StatusCode::BAD_REQUEST,
306 "invalid checksum".into(),
307 ))?;
308 }
309
310 let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
311 .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
312 let incident_id = uuid::Uuid::new_v4().to_string();
313 let panic = report.panic;
314
315 if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
316 return Err(Error::http(
317 StatusCode::BAD_REQUEST,
318 "invalid os version".into(),
319 ))?;
320 }
321
322 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
323 let response = blob_store_client
324 .head_object()
325 .bucket(CRASH_REPORTS_BUCKET)
326 .key(incident_id.clone() + ".json")
327 .send()
328 .await;
329
330 if response.is_ok() {
331 log::info!("We've already uploaded this crash");
332 return Ok(());
333 }
334
335 blob_store_client
336 .put_object()
337 .bucket(CRASH_REPORTS_BUCKET)
338 .key(incident_id.clone() + ".json")
339 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
340 .body(ByteStream::from(body.to_vec()))
341 .send()
342 .await
343 .map_err(|e| log::error!("Failed to upload crash: {}", e))
344 .ok();
345 }
346
347 let backtrace = panic.backtrace.join("\n");
348
349 tracing::error!(
350 service = "client",
351 version = %panic.app_version,
352 os_name = %panic.os_name,
353 os_version = %panic.os_version.clone().unwrap_or_default(),
354 incident_id = %incident_id,
355 installation_id = %panic.installation_id.clone().unwrap_or_default(),
356 description = %panic.payload,
357 backtrace = %backtrace,
358 "panic report"
359 );
360
361 if let Some(kinesis_client) = app.kinesis_client.clone()
362 && let Some(stream) = app.config.kinesis_stream.clone() {
363 let properties = json!({
364 "app_version": panic.app_version,
365 "os_name": panic.os_name,
366 "os_version": panic.os_version,
367 "incident_id": incident_id,
368 "installation_id": panic.installation_id,
369 "description": panic.payload,
370 "backtrace": backtrace,
371 });
372 let row = SnowflakeRow::new(
373 "Panic Reported",
374 None,
375 false,
376 panic.installation_id.clone(),
377 properties,
378 );
379 let data = serde_json::to_vec(&row)?;
380 kinesis_client
381 .put_record()
382 .stream_name(stream)
383 .partition_key(row.insert_id.unwrap_or_default())
384 .data(data.into())
385 .send()
386 .await
387 .log_err();
388 }
389
390 if !report_to_slack(&panic) {
391 return Ok(());
392 }
393
394 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
395 let backtrace = if panic.backtrace.len() > 25 {
396 let total = panic.backtrace.len();
397 format!(
398 "{}\n and {} more",
399 panic
400 .backtrace
401 .iter()
402 .take(20)
403 .cloned()
404 .collect::<Vec<_>>()
405 .join("\n"),
406 total - 20
407 )
408 } else {
409 panic.backtrace.join("\n")
410 };
411 let backtrace_with_summary = panic.payload + "\n" + &backtrace;
412
413 let version = if panic.release_channel == "nightly"
414 && !panic.app_version.contains("remote-server")
415 && let Some(sha) = panic.app_commit_sha
416 {
417 format!("Zed Nightly {}", sha.chars().take(7).collect::<String>())
418 } else {
419 panic.app_version
420 };
421
422 let payload = slack::WebhookBody::new(|w| {
423 w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
424 .add_section(|s| {
425 s.add_field(slack::Text::markdown(format!("*Version:*\n {version} ",)))
426 .add_field({
427 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
428 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
429 hostname.strip_prefix("http://").unwrap_or_default()
430 });
431
432 slack::Text::markdown(format!(
433 "*{} {}:*\n<https://{}.{}/{}.json|{}…>",
434 panic.os_name,
435 panic.os_version.unwrap_or_default(),
436 CRASH_REPORTS_BUCKET,
437 hostname,
438 incident_id,
439 incident_id.chars().take(8).collect::<String>(),
440 ))
441 })
442 })
443 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
444 });
445 let payload_json = serde_json::to_string(&payload).map_err(|err| {
446 log::error!("Failed to serialize payload to JSON: {err}");
447 Error::Internal(anyhow!(err))
448 })?;
449
450 reqwest::Client::new()
451 .post(slack_panics_webhook)
452 .header("Content-Type", "application/json")
453 .body(payload_json)
454 .send()
455 .await
456 .map_err(|err| {
457 log::error!("Failed to send payload to Slack: {err}");
458 Error::Internal(anyhow!(err))
459 })?;
460 }
461
462 Ok(())
463}
464
465fn report_to_slack(panic: &Panic) -> bool {
466 // Panics on macOS should make their way to Slack as a crash report,
467 // so we don't need to send them a second time via this channel.
468 if panic.os_name == "macOS" {
469 return false;
470 }
471
472 if panic.payload.contains("ERROR_SURFACE_LOST_KHR") {
473 return false;
474 }
475
476 if panic.payload.contains("ERROR_INITIALIZATION_FAILED") {
477 return false;
478 }
479
480 if panic
481 .payload
482 .contains("GPU has crashed, and no debug information is available")
483 {
484 return false;
485 }
486
487 true
488}
489
490pub async fn post_events(
491 Extension(app): Extension<Arc<AppState>>,
492 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
493 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
494 body: Bytes,
495) -> Result<()> {
496 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
497 return Err(Error::http(
498 StatusCode::INTERNAL_SERVER_ERROR,
499 "events not enabled".into(),
500 ))?;
501 };
502
503 let checksum_matched = checksum == expected;
504
505 let request_body: telemetry_events::EventRequestBody =
506 serde_json::from_slice(&body).map_err(|err| {
507 log::error!("can't parse event json: {err}");
508 Error::Internal(anyhow!(err))
509 })?;
510
511 let Some(last_event) = request_body.events.last() else {
512 return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
513 };
514 let country_code = country_code_header.map(|h| h.to_string());
515
516 let first_event_at = chrono::Utc::now()
517 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
518
519 if let Some(kinesis_client) = app.kinesis_client.clone()
520 && let Some(stream) = app.config.kinesis_stream.clone() {
521 let mut request = kinesis_client.put_records().stream_name(stream);
522 let mut has_records = false;
523 for row in for_snowflake(
524 request_body.clone(),
525 first_event_at,
526 country_code.clone(),
527 checksum_matched,
528 ) {
529 if let Some(data) = serde_json::to_vec(&row).log_err() {
530 request = request.records(
531 aws_sdk_kinesis::types::PutRecordsRequestEntry::builder()
532 .partition_key(request_body.system_id.clone().unwrap_or_default())
533 .data(data.into())
534 .build()
535 .unwrap(),
536 );
537 has_records = true;
538 }
539 }
540 if has_records {
541 request.send().await.log_err();
542 }
543 };
544
545 Ok(())
546}
547
548pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
549 let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
550
551 let mut summer = Sha256::new();
552 summer.update(checksum_seed);
553 summer.update(json);
554 summer.update(checksum_seed);
555 Some(summer.finalize().into_iter().collect())
556}
557
558fn for_snowflake(
559 body: EventRequestBody,
560 first_event_at: chrono::DateTime<chrono::Utc>,
561 country_code: Option<String>,
562 checksum_matched: bool,
563) -> impl Iterator<Item = SnowflakeRow> {
564 body.events.into_iter().map(move |event| {
565 let timestamp =
566 first_event_at + Duration::milliseconds(event.milliseconds_since_first_event);
567 let (event_type, mut event_properties) = match &event.event {
568 Event::Flexible(e) => (
569 e.event_type.clone(),
570 serde_json::to_value(&e.event_properties).unwrap(),
571 ),
572 };
573
574 if let serde_json::Value::Object(ref mut map) = event_properties {
575 map.insert("app_version".to_string(), body.app_version.clone().into());
576 map.insert("os_name".to_string(), body.os_name.clone().into());
577 map.insert("os_version".to_string(), body.os_version.clone().into());
578 map.insert("architecture".to_string(), body.architecture.clone().into());
579 map.insert(
580 "release_channel".to_string(),
581 body.release_channel.clone().into(),
582 );
583 map.insert("signed_in".to_string(), event.signed_in.into());
584 map.insert("checksum_matched".to_string(), checksum_matched.into());
585 if let Some(country_code) = country_code.as_ref() {
586 map.insert("country".to_string(), country_code.clone().into());
587 }
588 }
589
590 // NOTE: most amplitude user properties are read out of our event_properties
591 // dictionary. See https://app.amplitude.com/data/zed/Zed/sources/detail/production/falcon%3A159998
592 // for how that is configured.
593 let user_properties = body.is_staff.map(|is_staff| {
594 serde_json::json!({
595 "is_staff": is_staff,
596 })
597 });
598
599 SnowflakeRow {
600 time: timestamp,
601 user_id: body.metrics_id.clone(),
602 device_id: body.system_id.clone(),
603 event_type,
604 event_properties,
605 user_properties,
606 insert_id: Some(Uuid::new_v4().to_string()),
607 }
608 })
609}
610
611#[derive(Serialize, Deserialize, Debug)]
612pub struct SnowflakeRow {
613 pub time: chrono::DateTime<chrono::Utc>,
614 pub user_id: Option<String>,
615 pub device_id: Option<String>,
616 pub event_type: String,
617 pub event_properties: serde_json::Value,
618 pub user_properties: Option<serde_json::Value>,
619 pub insert_id: Option<String>,
620}
621
622impl SnowflakeRow {
623 pub fn new(
624 event_type: impl Into<String>,
625 metrics_id: Option<Uuid>,
626 is_staff: bool,
627 system_id: Option<String>,
628 event_properties: serde_json::Value,
629 ) -> Self {
630 Self {
631 time: chrono::Utc::now(),
632 event_type: event_type.into(),
633 device_id: system_id,
634 user_id: metrics_id.map(|id| id.to_string()),
635 insert_id: Some(uuid::Uuid::new_v4().to_string()),
636 event_properties,
637 user_properties: Some(json!({"is_staff": is_staff})),
638 }
639 }
640
641 pub async fn write(
642 self,
643 client: &Option<aws_sdk_kinesis::Client>,
644 stream: &Option<String>,
645 ) -> anyhow::Result<()> {
646 let Some((client, stream)) = client.as_ref().zip(stream.as_ref()) else {
647 return Ok(());
648 };
649 let row = serde_json::to_vec(&self)?;
650 client
651 .put_record()
652 .stream_name(stream)
653 .partition_key(&self.user_id.unwrap_or_default())
654 .data(row.into())
655 .send()
656 .await?;
657 Ok(())
658 }
659}