events.rs

  1use super::ips_file::IpsFile;
  2use crate::api::CloudflareIpCountryHeader;
  3use crate::{AppState, Error, Result, api::slack};
  4use anyhow::anyhow;
  5use aws_sdk_s3::primitives::ByteStream;
  6use axum::{
  7    Extension, Router, TypedHeader,
  8    body::Bytes,
  9    headers::Header,
 10    http::{HeaderMap, HeaderName, StatusCode},
 11    routing::post,
 12};
 13use chrono::Duration;
 14use semantic_version::SemanticVersion;
 15use serde::{Deserialize, Serialize};
 16use serde_json::json;
 17use sha2::{Digest, Sha256};
 18use std::sync::{Arc, OnceLock};
 19use telemetry_events::{Event, EventRequestBody, Panic};
 20use util::ResultExt;
 21use uuid::Uuid;
 22
 23const CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
 24
 25pub fn router() -> Router {
 26    Router::new()
 27        .route("/telemetry/events", post(post_events))
 28        .route("/telemetry/crashes", post(post_crash))
 29        .route("/telemetry/panics", post(post_panic))
 30        .route("/telemetry/hangs", post(post_hang))
 31}
 32
 33pub struct ZedChecksumHeader(Vec<u8>);
 34
 35impl Header for ZedChecksumHeader {
 36    fn name() -> &'static HeaderName {
 37        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
 38        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
 39    }
 40
 41    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 42    where
 43        Self: Sized,
 44        I: Iterator<Item = &'i axum::http::HeaderValue>,
 45    {
 46        let checksum = values
 47            .next()
 48            .ok_or_else(axum::headers::Error::invalid)?
 49            .to_str()
 50            .map_err(|_| axum::headers::Error::invalid())?;
 51
 52        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
 53        Ok(Self(bytes))
 54    }
 55
 56    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 57        unimplemented!()
 58    }
 59}
 60
 61pub async fn post_crash(
 62    Extension(app): Extension<Arc<AppState>>,
 63    headers: HeaderMap,
 64    body: Bytes,
 65) -> Result<()> {
 66    let report = IpsFile::parse(&body)?;
 67    let version_threshold = SemanticVersion::new(0, 123, 0);
 68
 69    let bundle_id = &report.header.bundle_id;
 70    let app_version = &report.app_version();
 71
 72    if bundle_id == "dev.zed.Zed-Dev" {
 73        log::error!("Crash uploads from {} are ignored.", bundle_id);
 74        return Ok(());
 75    }
 76
 77    if app_version.is_none() || app_version.unwrap() < version_threshold {
 78        log::error!(
 79            "Crash uploads from {} are ignored.",
 80            report.header.app_version
 81        );
 82        return Ok(());
 83    }
 84    let app_version = app_version.unwrap();
 85
 86    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 87        let response = blob_store_client
 88            .head_object()
 89            .bucket(CRASH_REPORTS_BUCKET)
 90            .key(report.header.incident_id.clone() + ".ips")
 91            .send()
 92            .await;
 93
 94        if response.is_ok() {
 95            log::info!("We've already uploaded this crash");
 96            return Ok(());
 97        }
 98
 99        blob_store_client
100            .put_object()
101            .bucket(CRASH_REPORTS_BUCKET)
102            .key(report.header.incident_id.clone() + ".ips")
103            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
104            .body(ByteStream::from(body.to_vec()))
105            .send()
106            .await
107            .map_err(|e| log::error!("Failed to upload crash: {}", e))
108            .ok();
109    }
110
111    let recent_panic_on: Option<i64> = headers
112        .get("x-zed-panicked-on")
113        .and_then(|h| h.to_str().ok())
114        .and_then(|s| s.parse().ok());
115
116    let installation_id = headers
117        .get("x-zed-installation-id")
118        .and_then(|h| h.to_str().ok())
119        .map(|s| s.to_string())
120        .unwrap_or_default();
121
122    let mut recent_panic = None;
123
124    if let Some(recent_panic_on) = recent_panic_on {
125        let crashed_at = match report.timestamp() {
126            Ok(t) => Some(t),
127            Err(e) => {
128                log::error!("Can't parse {}: {}", report.header.timestamp, e);
129                None
130            }
131        };
132        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
133            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
134        }
135    }
136
137    let description = report.description(recent_panic);
138    let summary = report.backtrace_summary();
139
140    tracing::error!(
141        service = "client",
142        version = %report.header.app_version,
143        os_version = %report.header.os_version,
144        bundle_id = %report.header.bundle_id,
145        incident_id = %report.header.incident_id,
146        installation_id = %installation_id,
147        description = %description,
148        backtrace = %summary,
149        "crash report"
150    );
151
152    if let Some(kinesis_client) = app.kinesis_client.clone()
153        && let Some(stream) = app.config.kinesis_stream.clone() {
154            let properties = json!({
155                "app_version": report.header.app_version,
156                "os_version": report.header.os_version,
157                "os_name": "macOS",
158                "bundle_id": report.header.bundle_id,
159                "incident_id": report.header.incident_id,
160                "installation_id": installation_id,
161                "description": description,
162                "backtrace": summary,
163            });
164            let row = SnowflakeRow::new(
165                "Crash Reported",
166                None,
167                false,
168                Some(installation_id),
169                properties,
170            );
171            let data = serde_json::to_vec(&row)?;
172            kinesis_client
173                .put_record()
174                .stream_name(stream)
175                .partition_key(row.insert_id.unwrap_or_default())
176                .data(data.into())
177                .send()
178                .await
179                .log_err();
180        }
181
182    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
183        let payload = slack::WebhookBody::new(|w| {
184            w.add_section(|s| s.text(slack::Text::markdown(description)))
185                .add_section(|s| {
186                    s.add_field(slack::Text::markdown(format!(
187                        "*Version:*\n{} ({})",
188                        bundle_id, app_version
189                    )))
190                    .add_field({
191                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
192                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
193                            hostname.strip_prefix("http://").unwrap_or_default()
194                        });
195
196                        slack::Text::markdown(format!(
197                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
198                            CRASH_REPORTS_BUCKET,
199                            hostname,
200                            report.header.incident_id,
201                            report
202                                .header
203                                .incident_id
204                                .chars()
205                                .take(8)
206                                .collect::<String>(),
207                        ))
208                    })
209                })
210                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
211        });
212        let payload_json = serde_json::to_string(&payload).map_err(|err| {
213            log::error!("Failed to serialize payload to JSON: {err}");
214            Error::Internal(anyhow!(err))
215        })?;
216
217        reqwest::Client::new()
218            .post(slack_panics_webhook)
219            .header("Content-Type", "application/json")
220            .body(payload_json)
221            .send()
222            .await
223            .map_err(|err| {
224                log::error!("Failed to send payload to Slack: {err}");
225                Error::Internal(anyhow!(err))
226            })?;
227    }
228
229    Ok(())
230}
231
232pub async fn post_hang(
233    Extension(app): Extension<Arc<AppState>>,
234    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
235    body: Bytes,
236) -> Result<()> {
237    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
238        return Err(Error::http(
239            StatusCode::INTERNAL_SERVER_ERROR,
240            "events not enabled".into(),
241        ))?;
242    };
243
244    if checksum != expected {
245        return Err(Error::http(
246            StatusCode::BAD_REQUEST,
247            "invalid checksum".into(),
248        ))?;
249    }
250
251    let incident_id = Uuid::new_v4().to_string();
252
253    // dump JSON into S3 so we can get frame offsets if we need to.
254    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
255        blob_store_client
256            .put_object()
257            .bucket(CRASH_REPORTS_BUCKET)
258            .key(incident_id.clone() + ".hang.json")
259            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
260            .body(ByteStream::from(body.to_vec()))
261            .send()
262            .await
263            .map_err(|e| log::error!("Failed to upload crash: {}", e))
264            .ok();
265    }
266
267    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
268        log::error!("can't parse report json: {err}");
269        Error::Internal(anyhow!(err))
270    })?;
271
272    let mut backtrace = "Possible hang detected on main thread:".to_string();
273    let unknown = "<unknown>".to_string();
274    for frame in report.backtrace.iter() {
275        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
276    }
277
278    tracing::error!(
279        service = "client",
280        version = %report.app_version.unwrap_or_default().to_string(),
281        os_name = %report.os_name,
282        os_version = report.os_version.unwrap_or_default().to_string(),
283        incident_id = %incident_id,
284        installation_id = %report.installation_id.unwrap_or_default(),
285        backtrace = %backtrace,
286        "hang report");
287
288    Ok(())
289}
290
291pub async fn post_panic(
292    Extension(app): Extension<Arc<AppState>>,
293    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
294    body: Bytes,
295) -> Result<()> {
296    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
297        return Err(Error::http(
298            StatusCode::INTERNAL_SERVER_ERROR,
299            "events not enabled".into(),
300        ))?;
301    };
302
303    if checksum != expected {
304        return Err(Error::http(
305            StatusCode::BAD_REQUEST,
306            "invalid checksum".into(),
307        ))?;
308    }
309
310    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
311        .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
312    let incident_id = uuid::Uuid::new_v4().to_string();
313    let panic = report.panic;
314
315    if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
316        return Err(Error::http(
317            StatusCode::BAD_REQUEST,
318            "invalid os version".into(),
319        ))?;
320    }
321
322    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
323        let response = blob_store_client
324            .head_object()
325            .bucket(CRASH_REPORTS_BUCKET)
326            .key(incident_id.clone() + ".json")
327            .send()
328            .await;
329
330        if response.is_ok() {
331            log::info!("We've already uploaded this crash");
332            return Ok(());
333        }
334
335        blob_store_client
336            .put_object()
337            .bucket(CRASH_REPORTS_BUCKET)
338            .key(incident_id.clone() + ".json")
339            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
340            .body(ByteStream::from(body.to_vec()))
341            .send()
342            .await
343            .map_err(|e| log::error!("Failed to upload crash: {}", e))
344            .ok();
345    }
346
347    let backtrace = panic.backtrace.join("\n");
348
349    tracing::error!(
350        service = "client",
351        version = %panic.app_version,
352        os_name = %panic.os_name,
353        os_version = %panic.os_version.clone().unwrap_or_default(),
354        incident_id = %incident_id,
355        installation_id = %panic.installation_id.clone().unwrap_or_default(),
356        description = %panic.payload,
357        backtrace = %backtrace,
358        "panic report"
359    );
360
361    if let Some(kinesis_client) = app.kinesis_client.clone()
362        && let Some(stream) = app.config.kinesis_stream.clone() {
363            let properties = json!({
364                "app_version": panic.app_version,
365                "os_name": panic.os_name,
366                "os_version": panic.os_version,
367                "incident_id": incident_id,
368                "installation_id": panic.installation_id,
369                "description": panic.payload,
370                "backtrace": backtrace,
371            });
372            let row = SnowflakeRow::new(
373                "Panic Reported",
374                None,
375                false,
376                panic.installation_id.clone(),
377                properties,
378            );
379            let data = serde_json::to_vec(&row)?;
380            kinesis_client
381                .put_record()
382                .stream_name(stream)
383                .partition_key(row.insert_id.unwrap_or_default())
384                .data(data.into())
385                .send()
386                .await
387                .log_err();
388        }
389
390    if !report_to_slack(&panic) {
391        return Ok(());
392    }
393
394    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
395        let backtrace = if panic.backtrace.len() > 25 {
396            let total = panic.backtrace.len();
397            format!(
398                "{}\n   and {} more",
399                panic
400                    .backtrace
401                    .iter()
402                    .take(20)
403                    .cloned()
404                    .collect::<Vec<_>>()
405                    .join("\n"),
406                total - 20
407            )
408        } else {
409            panic.backtrace.join("\n")
410        };
411        let backtrace_with_summary = panic.payload + "\n" + &backtrace;
412
413        let version = if panic.release_channel == "nightly"
414            && !panic.app_version.contains("remote-server")
415            && let Some(sha) = panic.app_commit_sha
416        {
417            format!("Zed Nightly {}", sha.chars().take(7).collect::<String>())
418        } else {
419            panic.app_version
420        };
421
422        let payload = slack::WebhookBody::new(|w| {
423            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
424                .add_section(|s| {
425                    s.add_field(slack::Text::markdown(format!("*Version:*\n {version} ",)))
426                        .add_field({
427                            let hostname = app.config.blob_store_url.clone().unwrap_or_default();
428                            let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
429                                hostname.strip_prefix("http://").unwrap_or_default()
430                            });
431
432                            slack::Text::markdown(format!(
433                                "*{} {}:*\n<https://{}.{}/{}.json|{}…>",
434                                panic.os_name,
435                                panic.os_version.unwrap_or_default(),
436                                CRASH_REPORTS_BUCKET,
437                                hostname,
438                                incident_id,
439                                incident_id.chars().take(8).collect::<String>(),
440                            ))
441                        })
442                })
443                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
444        });
445        let payload_json = serde_json::to_string(&payload).map_err(|err| {
446            log::error!("Failed to serialize payload to JSON: {err}");
447            Error::Internal(anyhow!(err))
448        })?;
449
450        reqwest::Client::new()
451            .post(slack_panics_webhook)
452            .header("Content-Type", "application/json")
453            .body(payload_json)
454            .send()
455            .await
456            .map_err(|err| {
457                log::error!("Failed to send payload to Slack: {err}");
458                Error::Internal(anyhow!(err))
459            })?;
460    }
461
462    Ok(())
463}
464
465fn report_to_slack(panic: &Panic) -> bool {
466    // Panics on macOS should make their way to Slack as a crash report,
467    // so we don't need to send them a second time via this channel.
468    if panic.os_name == "macOS" {
469        return false;
470    }
471
472    if panic.payload.contains("ERROR_SURFACE_LOST_KHR") {
473        return false;
474    }
475
476    if panic.payload.contains("ERROR_INITIALIZATION_FAILED") {
477        return false;
478    }
479
480    if panic
481        .payload
482        .contains("GPU has crashed, and no debug information is available")
483    {
484        return false;
485    }
486
487    true
488}
489
490pub async fn post_events(
491    Extension(app): Extension<Arc<AppState>>,
492    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
493    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
494    body: Bytes,
495) -> Result<()> {
496    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
497        return Err(Error::http(
498            StatusCode::INTERNAL_SERVER_ERROR,
499            "events not enabled".into(),
500        ))?;
501    };
502
503    let checksum_matched = checksum == expected;
504
505    let request_body: telemetry_events::EventRequestBody =
506        serde_json::from_slice(&body).map_err(|err| {
507            log::error!("can't parse event json: {err}");
508            Error::Internal(anyhow!(err))
509        })?;
510
511    let Some(last_event) = request_body.events.last() else {
512        return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
513    };
514    let country_code = country_code_header.map(|h| h.to_string());
515
516    let first_event_at = chrono::Utc::now()
517        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
518
519    if let Some(kinesis_client) = app.kinesis_client.clone()
520        && let Some(stream) = app.config.kinesis_stream.clone() {
521            let mut request = kinesis_client.put_records().stream_name(stream);
522            let mut has_records = false;
523            for row in for_snowflake(
524                request_body.clone(),
525                first_event_at,
526                country_code.clone(),
527                checksum_matched,
528            ) {
529                if let Some(data) = serde_json::to_vec(&row).log_err() {
530                    request = request.records(
531                        aws_sdk_kinesis::types::PutRecordsRequestEntry::builder()
532                            .partition_key(request_body.system_id.clone().unwrap_or_default())
533                            .data(data.into())
534                            .build()
535                            .unwrap(),
536                    );
537                    has_records = true;
538                }
539            }
540            if has_records {
541                request.send().await.log_err();
542            }
543        };
544
545    Ok(())
546}
547
548pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
549    let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
550
551    let mut summer = Sha256::new();
552    summer.update(checksum_seed);
553    summer.update(json);
554    summer.update(checksum_seed);
555    Some(summer.finalize().into_iter().collect())
556}
557
558fn for_snowflake(
559    body: EventRequestBody,
560    first_event_at: chrono::DateTime<chrono::Utc>,
561    country_code: Option<String>,
562    checksum_matched: bool,
563) -> impl Iterator<Item = SnowflakeRow> {
564    body.events.into_iter().map(move |event| {
565        let timestamp =
566            first_event_at + Duration::milliseconds(event.milliseconds_since_first_event);
567        let (event_type, mut event_properties) = match &event.event {
568            Event::Flexible(e) => (
569                e.event_type.clone(),
570                serde_json::to_value(&e.event_properties).unwrap(),
571            ),
572        };
573
574        if let serde_json::Value::Object(ref mut map) = event_properties {
575            map.insert("app_version".to_string(), body.app_version.clone().into());
576            map.insert("os_name".to_string(), body.os_name.clone().into());
577            map.insert("os_version".to_string(), body.os_version.clone().into());
578            map.insert("architecture".to_string(), body.architecture.clone().into());
579            map.insert(
580                "release_channel".to_string(),
581                body.release_channel.clone().into(),
582            );
583            map.insert("signed_in".to_string(), event.signed_in.into());
584            map.insert("checksum_matched".to_string(), checksum_matched.into());
585            if let Some(country_code) = country_code.as_ref() {
586                map.insert("country".to_string(), country_code.clone().into());
587            }
588        }
589
590        // NOTE: most amplitude user properties are read out of our event_properties
591        // dictionary. See https://app.amplitude.com/data/zed/Zed/sources/detail/production/falcon%3A159998
592        // for how that is configured.
593        let user_properties = body.is_staff.map(|is_staff| {
594            serde_json::json!({
595                "is_staff": is_staff,
596            })
597        });
598
599        SnowflakeRow {
600            time: timestamp,
601            user_id: body.metrics_id.clone(),
602            device_id: body.system_id.clone(),
603            event_type,
604            event_properties,
605            user_properties,
606            insert_id: Some(Uuid::new_v4().to_string()),
607        }
608    })
609}
610
611#[derive(Serialize, Deserialize, Debug)]
612pub struct SnowflakeRow {
613    pub time: chrono::DateTime<chrono::Utc>,
614    pub user_id: Option<String>,
615    pub device_id: Option<String>,
616    pub event_type: String,
617    pub event_properties: serde_json::Value,
618    pub user_properties: Option<serde_json::Value>,
619    pub insert_id: Option<String>,
620}
621
622impl SnowflakeRow {
623    pub fn new(
624        event_type: impl Into<String>,
625        metrics_id: Option<Uuid>,
626        is_staff: bool,
627        system_id: Option<String>,
628        event_properties: serde_json::Value,
629    ) -> Self {
630        Self {
631            time: chrono::Utc::now(),
632            event_type: event_type.into(),
633            device_id: system_id,
634            user_id: metrics_id.map(|id| id.to_string()),
635            insert_id: Some(uuid::Uuid::new_v4().to_string()),
636            event_properties,
637            user_properties: Some(json!({"is_staff": is_staff})),
638        }
639    }
640
641    pub async fn write(
642        self,
643        client: &Option<aws_sdk_kinesis::Client>,
644        stream: &Option<String>,
645    ) -> anyhow::Result<()> {
646        let Some((client, stream)) = client.as_ref().zip(stream.as_ref()) else {
647            return Ok(());
648        };
649        let row = serde_json::to_vec(&self)?;
650        client
651            .put_record()
652            .stream_name(stream)
653            .partition_key(&self.user_id.unwrap_or_default())
654            .data(row.into())
655            .send()
656            .await?;
657        Ok(())
658    }
659}