1use super::ips_file::IpsFile;
2use crate::api::CloudflareIpCountryHeader;
3use crate::{AppState, Error, Result, api::slack};
4use anyhow::anyhow;
5use aws_sdk_s3::primitives::ByteStream;
6use axum::{
7 Extension, Router, TypedHeader,
8 body::Bytes,
9 headers::Header,
10 http::{HeaderMap, HeaderName, StatusCode},
11 routing::post,
12};
13use chrono::Duration;
14use semantic_version::SemanticVersion;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use sha2::{Digest, Sha256};
18use std::sync::{Arc, OnceLock};
19use telemetry_events::{Event, EventRequestBody, Panic};
20use util::ResultExt;
21use uuid::Uuid;
22
23const CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
24
25pub fn router() -> Router {
26 Router::new()
27 .route("/telemetry/events", post(post_events))
28 .route("/telemetry/crashes", post(post_crash))
29 .route("/telemetry/panics", post(post_panic))
30 .route("/telemetry/hangs", post(post_hang))
31}
32
33pub struct ZedChecksumHeader(Vec<u8>);
34
35impl Header for ZedChecksumHeader {
36 fn name() -> &'static HeaderName {
37 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
38 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
39 }
40
41 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
42 where
43 Self: Sized,
44 I: Iterator<Item = &'i axum::http::HeaderValue>,
45 {
46 let checksum = values
47 .next()
48 .ok_or_else(axum::headers::Error::invalid)?
49 .to_str()
50 .map_err(|_| axum::headers::Error::invalid())?;
51
52 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
53 Ok(Self(bytes))
54 }
55
56 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
57 unimplemented!()
58 }
59}
60
61pub async fn post_crash(
62 Extension(app): Extension<Arc<AppState>>,
63 headers: HeaderMap,
64 body: Bytes,
65) -> Result<()> {
66 let report = IpsFile::parse(&body)?;
67 let version_threshold = SemanticVersion::new(0, 123, 0);
68
69 let bundle_id = &report.header.bundle_id;
70 let app_version = &report.app_version();
71
72 if bundle_id == "dev.zed.Zed-Dev" {
73 log::error!("Crash uploads from {} are ignored.", bundle_id);
74 return Ok(());
75 }
76
77 if app_version.is_none() || app_version.unwrap() < version_threshold {
78 log::error!(
79 "Crash uploads from {} are ignored.",
80 report.header.app_version
81 );
82 return Ok(());
83 }
84 let app_version = app_version.unwrap();
85
86 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
87 let response = blob_store_client
88 .head_object()
89 .bucket(CRASH_REPORTS_BUCKET)
90 .key(report.header.incident_id.clone() + ".ips")
91 .send()
92 .await;
93
94 if response.is_ok() {
95 log::info!("We've already uploaded this crash");
96 return Ok(());
97 }
98
99 blob_store_client
100 .put_object()
101 .bucket(CRASH_REPORTS_BUCKET)
102 .key(report.header.incident_id.clone() + ".ips")
103 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
104 .body(ByteStream::from(body.to_vec()))
105 .send()
106 .await
107 .map_err(|e| log::error!("Failed to upload crash: {}", e))
108 .ok();
109 }
110
111 let recent_panic_on: Option<i64> = headers
112 .get("x-zed-panicked-on")
113 .and_then(|h| h.to_str().ok())
114 .and_then(|s| s.parse().ok());
115
116 let installation_id = headers
117 .get("x-zed-installation-id")
118 .and_then(|h| h.to_str().ok())
119 .map(|s| s.to_string())
120 .unwrap_or_default();
121
122 let mut recent_panic = None;
123
124 if let Some(recent_panic_on) = recent_panic_on {
125 let crashed_at = match report.timestamp() {
126 Ok(t) => Some(t),
127 Err(e) => {
128 log::error!("Can't parse {}: {}", report.header.timestamp, e);
129 None
130 }
131 };
132 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
133 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
134 }
135 }
136
137 let description = report.description(recent_panic);
138 let summary = report.backtrace_summary();
139
140 tracing::error!(
141 service = "client",
142 version = %report.header.app_version,
143 os_version = %report.header.os_version,
144 bundle_id = %report.header.bundle_id,
145 incident_id = %report.header.incident_id,
146 installation_id = %installation_id,
147 description = %description,
148 backtrace = %summary,
149 "crash report"
150 );
151
152 if let Some(kinesis_client) = app.kinesis_client.clone() {
153 if let Some(stream) = app.config.kinesis_stream.clone() {
154 let properties = json!({
155 "app_version": report.header.app_version,
156 "os_version": report.header.os_version,
157 "os_name": "macOS",
158 "bundle_id": report.header.bundle_id,
159 "incident_id": report.header.incident_id,
160 "installation_id": installation_id,
161 "description": description,
162 "backtrace": summary,
163 });
164 let row = SnowflakeRow::new(
165 "Crash Reported",
166 None,
167 false,
168 Some(installation_id),
169 properties,
170 );
171 let data = serde_json::to_vec(&row)?;
172 kinesis_client
173 .put_record()
174 .stream_name(stream)
175 .partition_key(row.insert_id.unwrap_or_default())
176 .data(data.into())
177 .send()
178 .await
179 .log_err();
180 }
181 }
182
183 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
184 let payload = slack::WebhookBody::new(|w| {
185 w.add_section(|s| s.text(slack::Text::markdown(description)))
186 .add_section(|s| {
187 s.add_field(slack::Text::markdown(format!(
188 "*Version:*\n{} ({})",
189 bundle_id, app_version
190 )))
191 .add_field({
192 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
193 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
194 hostname.strip_prefix("http://").unwrap_or_default()
195 });
196
197 slack::Text::markdown(format!(
198 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
199 CRASH_REPORTS_BUCKET,
200 hostname,
201 report.header.incident_id,
202 report
203 .header
204 .incident_id
205 .chars()
206 .take(8)
207 .collect::<String>(),
208 ))
209 })
210 })
211 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
212 });
213 let payload_json = serde_json::to_string(&payload).map_err(|err| {
214 log::error!("Failed to serialize payload to JSON: {err}");
215 Error::Internal(anyhow!(err))
216 })?;
217
218 reqwest::Client::new()
219 .post(slack_panics_webhook)
220 .header("Content-Type", "application/json")
221 .body(payload_json)
222 .send()
223 .await
224 .map_err(|err| {
225 log::error!("Failed to send payload to Slack: {err}");
226 Error::Internal(anyhow!(err))
227 })?;
228 }
229
230 Ok(())
231}
232
233pub async fn post_hang(
234 Extension(app): Extension<Arc<AppState>>,
235 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
236 body: Bytes,
237) -> Result<()> {
238 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
239 return Err(Error::http(
240 StatusCode::INTERNAL_SERVER_ERROR,
241 "events not enabled".into(),
242 ))?;
243 };
244
245 if checksum != expected {
246 return Err(Error::http(
247 StatusCode::BAD_REQUEST,
248 "invalid checksum".into(),
249 ))?;
250 }
251
252 let incident_id = Uuid::new_v4().to_string();
253
254 // dump JSON into S3 so we can get frame offsets if we need to.
255 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
256 blob_store_client
257 .put_object()
258 .bucket(CRASH_REPORTS_BUCKET)
259 .key(incident_id.clone() + ".hang.json")
260 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
261 .body(ByteStream::from(body.to_vec()))
262 .send()
263 .await
264 .map_err(|e| log::error!("Failed to upload crash: {}", e))
265 .ok();
266 }
267
268 let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
269 log::error!("can't parse report json: {err}");
270 Error::Internal(anyhow!(err))
271 })?;
272
273 let mut backtrace = "Possible hang detected on main thread:".to_string();
274 let unknown = "<unknown>".to_string();
275 for frame in report.backtrace.iter() {
276 backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
277 }
278
279 tracing::error!(
280 service = "client",
281 version = %report.app_version.unwrap_or_default().to_string(),
282 os_name = %report.os_name,
283 os_version = report.os_version.unwrap_or_default().to_string(),
284 incident_id = %incident_id,
285 installation_id = %report.installation_id.unwrap_or_default(),
286 backtrace = %backtrace,
287 "hang report");
288
289 Ok(())
290}
291
292pub async fn post_panic(
293 Extension(app): Extension<Arc<AppState>>,
294 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
295 body: Bytes,
296) -> Result<()> {
297 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
298 return Err(Error::http(
299 StatusCode::INTERNAL_SERVER_ERROR,
300 "events not enabled".into(),
301 ))?;
302 };
303
304 if checksum != expected {
305 return Err(Error::http(
306 StatusCode::BAD_REQUEST,
307 "invalid checksum".into(),
308 ))?;
309 }
310
311 let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
312 .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
313 let incident_id = uuid::Uuid::new_v4().to_string();
314 let panic = report.panic;
315
316 if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
317 return Err(Error::http(
318 StatusCode::BAD_REQUEST,
319 "invalid os version".into(),
320 ))?;
321 }
322
323 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
324 let response = blob_store_client
325 .head_object()
326 .bucket(CRASH_REPORTS_BUCKET)
327 .key(incident_id.clone() + ".json")
328 .send()
329 .await;
330
331 if response.is_ok() {
332 log::info!("We've already uploaded this crash");
333 return Ok(());
334 }
335
336 blob_store_client
337 .put_object()
338 .bucket(CRASH_REPORTS_BUCKET)
339 .key(incident_id.clone() + ".json")
340 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
341 .body(ByteStream::from(body.to_vec()))
342 .send()
343 .await
344 .map_err(|e| log::error!("Failed to upload crash: {}", e))
345 .ok();
346 }
347
348 let backtrace = panic.backtrace.join("\n");
349
350 tracing::error!(
351 service = "client",
352 version = %panic.app_version,
353 os_name = %panic.os_name,
354 os_version = %panic.os_version.clone().unwrap_or_default(),
355 incident_id = %incident_id,
356 installation_id = %panic.installation_id.clone().unwrap_or_default(),
357 description = %panic.payload,
358 backtrace = %backtrace,
359 "panic report"
360 );
361
362 if let Some(kinesis_client) = app.kinesis_client.clone() {
363 if let Some(stream) = app.config.kinesis_stream.clone() {
364 let properties = json!({
365 "app_version": panic.app_version,
366 "os_name": panic.os_name,
367 "os_version": panic.os_version,
368 "incident_id": incident_id,
369 "installation_id": panic.installation_id,
370 "description": panic.payload,
371 "backtrace": backtrace,
372 });
373 let row = SnowflakeRow::new(
374 "Panic Reported",
375 None,
376 false,
377 panic.installation_id.clone(),
378 properties,
379 );
380 let data = serde_json::to_vec(&row)?;
381 kinesis_client
382 .put_record()
383 .stream_name(stream)
384 .partition_key(row.insert_id.unwrap_or_default())
385 .data(data.into())
386 .send()
387 .await
388 .log_err();
389 }
390 }
391
392 let backtrace = if panic.backtrace.len() > 25 {
393 let total = panic.backtrace.len();
394 format!(
395 "{}\n and {} more",
396 panic
397 .backtrace
398 .iter()
399 .take(20)
400 .cloned()
401 .collect::<Vec<_>>()
402 .join("\n"),
403 total - 20
404 )
405 } else {
406 panic.backtrace.join("\n")
407 };
408
409 if !report_to_slack(&panic) {
410 return Ok(());
411 }
412
413 let backtrace_with_summary = panic.payload + "\n" + &backtrace;
414
415 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
416 let payload = slack::WebhookBody::new(|w| {
417 w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
418 .add_section(|s| {
419 s.add_field(slack::Text::markdown(format!(
420 "*Version:*\n {} ",
421 panic.app_version
422 )))
423 .add_field({
424 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
425 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
426 hostname.strip_prefix("http://").unwrap_or_default()
427 });
428
429 slack::Text::markdown(format!(
430 "*{} {}:*\n<https://{}.{}/{}.json|{}…>",
431 panic.os_name,
432 panic.os_version.unwrap_or_default(),
433 CRASH_REPORTS_BUCKET,
434 hostname,
435 incident_id,
436 incident_id.chars().take(8).collect::<String>(),
437 ))
438 })
439 })
440 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
441 });
442 let payload_json = serde_json::to_string(&payload).map_err(|err| {
443 log::error!("Failed to serialize payload to JSON: {err}");
444 Error::Internal(anyhow!(err))
445 })?;
446
447 reqwest::Client::new()
448 .post(slack_panics_webhook)
449 .header("Content-Type", "application/json")
450 .body(payload_json)
451 .send()
452 .await
453 .map_err(|err| {
454 log::error!("Failed to send payload to Slack: {err}");
455 Error::Internal(anyhow!(err))
456 })?;
457 }
458
459 Ok(())
460}
461
462fn report_to_slack(panic: &Panic) -> bool {
463 // Panics on macOS should make their way to Slack as a crash report,
464 // so we don't need to send them a second time via this channel.
465 if panic.os_name == "macOS" {
466 return false;
467 }
468
469 if panic.payload.contains("ERROR_SURFACE_LOST_KHR") {
470 return false;
471 }
472
473 if panic.payload.contains("ERROR_INITIALIZATION_FAILED") {
474 return false;
475 }
476
477 if panic
478 .payload
479 .contains("GPU has crashed, and no debug information is available")
480 {
481 return false;
482 }
483
484 true
485}
486
487pub async fn post_events(
488 Extension(app): Extension<Arc<AppState>>,
489 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
490 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
491 body: Bytes,
492) -> Result<()> {
493 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
494 return Err(Error::http(
495 StatusCode::INTERNAL_SERVER_ERROR,
496 "events not enabled".into(),
497 ))?;
498 };
499
500 let checksum_matched = checksum == expected;
501
502 let request_body: telemetry_events::EventRequestBody =
503 serde_json::from_slice(&body).map_err(|err| {
504 log::error!("can't parse event json: {err}");
505 Error::Internal(anyhow!(err))
506 })?;
507
508 let Some(last_event) = request_body.events.last() else {
509 return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
510 };
511 let country_code = country_code_header.map(|h| h.to_string());
512
513 let first_event_at = chrono::Utc::now()
514 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
515
516 if let Some(kinesis_client) = app.kinesis_client.clone() {
517 if let Some(stream) = app.config.kinesis_stream.clone() {
518 let mut request = kinesis_client.put_records().stream_name(stream);
519 for row in for_snowflake(
520 request_body.clone(),
521 first_event_at,
522 country_code.clone(),
523 checksum_matched,
524 ) {
525 if let Some(data) = serde_json::to_vec(&row).log_err() {
526 request = request.records(
527 aws_sdk_kinesis::types::PutRecordsRequestEntry::builder()
528 .partition_key(request_body.system_id.clone().unwrap_or_default())
529 .data(data.into())
530 .build()
531 .unwrap(),
532 );
533 }
534 }
535 request.send().await.log_err();
536 }
537 };
538
539 Ok(())
540}
541
542pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
543 let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
544
545 let mut summer = Sha256::new();
546 summer.update(checksum_seed);
547 summer.update(json);
548 summer.update(checksum_seed);
549 Some(summer.finalize().into_iter().collect())
550}
551
552fn for_snowflake(
553 body: EventRequestBody,
554 first_event_at: chrono::DateTime<chrono::Utc>,
555 country_code: Option<String>,
556 checksum_matched: bool,
557) -> impl Iterator<Item = SnowflakeRow> {
558 body.events.into_iter().flat_map(move |event| {
559 let timestamp =
560 first_event_at + Duration::milliseconds(event.milliseconds_since_first_event);
561 // We will need to double check, but I believe all of the events that
562 // are being transformed here are now migrated over to use the
563 // telemetry::event! macro, as of this commit so this code can go away
564 // when we feel enough users have upgraded past this point.
565 let (event_type, mut event_properties) = match &event.event {
566 Event::Editor(e) => (
567 match e.operation.as_str() {
568 "open" => "Editor Opened".to_string(),
569 "save" => "Editor Saved".to_string(),
570 _ => format!("Unknown Editor Event: {}", e.operation),
571 },
572 serde_json::to_value(e).unwrap(),
573 ),
574 Event::InlineCompletion(e) => (
575 format!(
576 "Edit Prediction {}",
577 if e.suggestion_accepted {
578 "Accepted"
579 } else {
580 "Discarded"
581 }
582 ),
583 serde_json::to_value(e).unwrap(),
584 ),
585 Event::InlineCompletionRating(e) => (
586 "Edit Prediction Rated".to_string(),
587 serde_json::to_value(e).unwrap(),
588 ),
589 Event::Call(e) => {
590 let event_type = match e.operation.trim() {
591 "unshare project" => "Project Unshared".to_string(),
592 "open channel notes" => "Channel Notes Opened".to_string(),
593 "share project" => "Project Shared".to_string(),
594 "join channel" => "Channel Joined".to_string(),
595 "hang up" => "Call Ended".to_string(),
596 "accept incoming" => "Incoming Call Accepted".to_string(),
597 "invite" => "Participant Invited".to_string(),
598 "disable microphone" => "Microphone Disabled".to_string(),
599 "enable microphone" => "Microphone Enabled".to_string(),
600 "enable screen share" => "Screen Share Enabled".to_string(),
601 "disable screen share" => "Screen Share Disabled".to_string(),
602 "decline incoming" => "Incoming Call Declined".to_string(),
603 _ => format!("Unknown Call Event: {}", e.operation),
604 };
605
606 (event_type, serde_json::to_value(e).unwrap())
607 }
608 Event::Assistant(e) => (
609 match e.phase {
610 telemetry_events::AssistantPhase::Response => "Assistant Responded".to_string(),
611 telemetry_events::AssistantPhase::Invoked => "Assistant Invoked".to_string(),
612 telemetry_events::AssistantPhase::Accepted => {
613 "Assistant Response Accepted".to_string()
614 }
615 telemetry_events::AssistantPhase::Rejected => {
616 "Assistant Response Rejected".to_string()
617 }
618 },
619 serde_json::to_value(e).unwrap(),
620 ),
621 Event::Cpu(_) | Event::Memory(_) => return None,
622 Event::App(e) => {
623 let mut properties = json!({});
624 let event_type = match e.operation.trim() {
625 // App
626 "open" => "App Opened".to_string(),
627 "first open" => "App First Opened".to_string(),
628 "first open for release channel" => {
629 "App First Opened For Release Channel".to_string()
630 }
631 "close" => "App Closed".to_string(),
632
633 // Project
634 "open project" => "Project Opened".to_string(),
635 "open node project" => {
636 properties["project_type"] = json!("node");
637 "Project Opened".to_string()
638 }
639 "open pnpm project" => {
640 properties["project_type"] = json!("pnpm");
641 "Project Opened".to_string()
642 }
643 "open yarn project" => {
644 properties["project_type"] = json!("yarn");
645 "Project Opened".to_string()
646 }
647
648 // SSH
649 "create ssh server" => "SSH Server Created".to_string(),
650 "create ssh project" => "SSH Project Created".to_string(),
651 "open ssh project" => "SSH Project Opened".to_string(),
652
653 // Welcome Page
654 "welcome page: change keymap" => "Welcome Keymap Changed".to_string(),
655 "welcome page: change theme" => "Welcome Theme Changed".to_string(),
656 "welcome page: close" => "Welcome Page Closed".to_string(),
657 "welcome page: edit settings" => "Welcome Settings Edited".to_string(),
658 "welcome page: install cli" => "Welcome CLI Installed".to_string(),
659 "welcome page: open" => "Welcome Page Opened".to_string(),
660 "welcome page: open extensions" => "Welcome Extensions Page Opened".to_string(),
661 "welcome page: sign in to copilot" => "Welcome Copilot Signed In".to_string(),
662 "welcome page: toggle diagnostic telemetry" => {
663 "Welcome Diagnostic Telemetry Toggled".to_string()
664 }
665 "welcome page: toggle metric telemetry" => {
666 "Welcome Metric Telemetry Toggled".to_string()
667 }
668 "welcome page: toggle vim" => "Welcome Vim Mode Toggled".to_string(),
669 "welcome page: view docs" => "Welcome Documentation Viewed".to_string(),
670
671 // Extensions
672 "extensions page: open" => "Extensions Page Opened".to_string(),
673 "extensions: install extension" => "Extension Installed".to_string(),
674 "extensions: uninstall extension" => "Extension Uninstalled".to_string(),
675
676 // Misc
677 "markdown preview: open" => "Markdown Preview Opened".to_string(),
678 "project diagnostics: open" => "Project Diagnostics Opened".to_string(),
679 "project search: open" => "Project Search Opened".to_string(),
680 "repl sessions: open" => "REPL Session Started".to_string(),
681
682 // Feature Upsell
683 "feature upsell: toggle vim" => {
684 properties["source"] = json!("Feature Upsell");
685 "Vim Mode Toggled".to_string()
686 }
687 _ => e
688 .operation
689 .strip_prefix("feature upsell: viewed docs (")
690 .and_then(|s| s.strip_suffix(')'))
691 .map_or_else(
692 || format!("Unknown App Event: {}", e.operation),
693 |docs_url| {
694 properties["url"] = json!(docs_url);
695 properties["source"] = json!("Feature Upsell");
696 "Documentation Viewed".to_string()
697 },
698 ),
699 };
700 (event_type, properties)
701 }
702 Event::Setting(e) => (
703 "Settings Changed".to_string(),
704 serde_json::to_value(e).unwrap(),
705 ),
706 Event::Extension(e) => (
707 "Extension Loaded".to_string(),
708 serde_json::to_value(e).unwrap(),
709 ),
710 Event::Edit(e) => (
711 "Editor Edited".to_string(),
712 serde_json::to_value(e).unwrap(),
713 ),
714 Event::Action(e) => (
715 "Action Invoked".to_string(),
716 serde_json::to_value(e).unwrap(),
717 ),
718 Event::Repl(e) => (
719 "Kernel Status Changed".to_string(),
720 serde_json::to_value(e).unwrap(),
721 ),
722 Event::Flexible(e) => (
723 e.event_type.clone(),
724 serde_json::to_value(&e.event_properties).unwrap(),
725 ),
726 };
727
728 if let serde_json::Value::Object(ref mut map) = event_properties {
729 map.insert("app_version".to_string(), body.app_version.clone().into());
730 map.insert("os_name".to_string(), body.os_name.clone().into());
731 map.insert("os_version".to_string(), body.os_version.clone().into());
732 map.insert("architecture".to_string(), body.architecture.clone().into());
733 map.insert(
734 "release_channel".to_string(),
735 body.release_channel.clone().into(),
736 );
737 map.insert("signed_in".to_string(), event.signed_in.into());
738 map.insert("checksum_matched".to_string(), checksum_matched.into());
739 if let Some(country_code) = country_code.as_ref() {
740 map.insert("country".to_string(), country_code.clone().into());
741 }
742 }
743
744 // NOTE: most amplitude user properties are read out of our event_properties
745 // dictionary. See https://app.amplitude.com/data/zed/Zed/sources/detail/production/falcon%3A159998
746 // for how that is configured.
747 let user_properties = Some(serde_json::json!({
748 "is_staff": body.is_staff,
749 }));
750
751 Some(SnowflakeRow {
752 time: timestamp,
753 user_id: body.metrics_id.clone(),
754 device_id: body.system_id.clone(),
755 event_type,
756 event_properties,
757 user_properties,
758 insert_id: Some(Uuid::new_v4().to_string()),
759 })
760 })
761}
762
763#[derive(Serialize, Deserialize, Debug)]
764pub struct SnowflakeRow {
765 pub time: chrono::DateTime<chrono::Utc>,
766 pub user_id: Option<String>,
767 pub device_id: Option<String>,
768 pub event_type: String,
769 pub event_properties: serde_json::Value,
770 pub user_properties: Option<serde_json::Value>,
771 pub insert_id: Option<String>,
772}
773
774impl SnowflakeRow {
775 pub fn new(
776 event_type: impl Into<String>,
777 metrics_id: Option<Uuid>,
778 is_staff: bool,
779 system_id: Option<String>,
780 event_properties: serde_json::Value,
781 ) -> Self {
782 Self {
783 time: chrono::Utc::now(),
784 event_type: event_type.into(),
785 device_id: system_id,
786 user_id: metrics_id.map(|id| id.to_string()),
787 insert_id: Some(uuid::Uuid::new_v4().to_string()),
788 event_properties,
789 user_properties: Some(json!({"is_staff": is_staff})),
790 }
791 }
792
793 pub async fn write(
794 self,
795 client: &Option<aws_sdk_kinesis::Client>,
796 stream: &Option<String>,
797 ) -> anyhow::Result<()> {
798 let Some((client, stream)) = client.as_ref().zip(stream.as_ref()) else {
799 return Ok(());
800 };
801 let row = serde_json::to_vec(&self)?;
802 client
803 .put_record()
804 .stream_name(stream)
805 .partition_key(&self.user_id.unwrap_or_default())
806 .data(row.into())
807 .send()
808 .await?;
809 Ok(())
810 }
811}