1use super::ips_file::IpsFile;
2use crate::api::CloudflareIpCountryHeader;
3use crate::{AppState, Error, Result, api::slack};
4use anyhow::anyhow;
5use aws_sdk_s3::primitives::ByteStream;
6use axum::{
7 Extension, Router, TypedHeader,
8 body::Bytes,
9 headers::Header,
10 http::{HeaderMap, HeaderName, StatusCode},
11 routing::post,
12};
13use chrono::Duration;
14use semantic_version::SemanticVersion;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use sha2::{Digest, Sha256};
18use std::sync::{Arc, OnceLock};
19use telemetry_events::{Event, EventRequestBody, Panic};
20use util::ResultExt;
21use uuid::Uuid;
22
23const CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
24
25pub fn router() -> Router {
26 Router::new()
27 .route("/telemetry/events", post(post_events))
28 .route("/telemetry/crashes", post(post_crash))
29 .route("/telemetry/panics", post(post_panic))
30 .route("/telemetry/hangs", post(post_hang))
31}
32
33pub struct ZedChecksumHeader(Vec<u8>);
34
35impl Header for ZedChecksumHeader {
36 fn name() -> &'static HeaderName {
37 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
38 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
39 }
40
41 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
42 where
43 Self: Sized,
44 I: Iterator<Item = &'i axum::http::HeaderValue>,
45 {
46 let checksum = values
47 .next()
48 .ok_or_else(axum::headers::Error::invalid)?
49 .to_str()
50 .map_err(|_| axum::headers::Error::invalid())?;
51
52 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
53 Ok(Self(bytes))
54 }
55
56 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
57 unimplemented!()
58 }
59}
60
61pub async fn post_crash(
62 Extension(app): Extension<Arc<AppState>>,
63 headers: HeaderMap,
64 body: Bytes,
65) -> Result<()> {
66 let report = IpsFile::parse(&body)?;
67 let version_threshold = SemanticVersion::new(0, 123, 0);
68
69 let bundle_id = &report.header.bundle_id;
70 let app_version = &report.app_version();
71
72 if bundle_id == "dev.zed.Zed-Dev" {
73 log::error!("Crash uploads from {} are ignored.", bundle_id);
74 return Ok(());
75 }
76
77 if app_version.is_none() || app_version.unwrap() < version_threshold {
78 log::error!(
79 "Crash uploads from {} are ignored.",
80 report.header.app_version
81 );
82 return Ok(());
83 }
84 let app_version = app_version.unwrap();
85
86 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
87 let response = blob_store_client
88 .head_object()
89 .bucket(CRASH_REPORTS_BUCKET)
90 .key(report.header.incident_id.clone() + ".ips")
91 .send()
92 .await;
93
94 if response.is_ok() {
95 log::info!("We've already uploaded this crash");
96 return Ok(());
97 }
98
99 blob_store_client
100 .put_object()
101 .bucket(CRASH_REPORTS_BUCKET)
102 .key(report.header.incident_id.clone() + ".ips")
103 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
104 .body(ByteStream::from(body.to_vec()))
105 .send()
106 .await
107 .map_err(|e| log::error!("Failed to upload crash: {}", e))
108 .ok();
109 }
110
111 let recent_panic_on: Option<i64> = headers
112 .get("x-zed-panicked-on")
113 .and_then(|h| h.to_str().ok())
114 .and_then(|s| s.parse().ok());
115
116 let installation_id = headers
117 .get("x-zed-installation-id")
118 .and_then(|h| h.to_str().ok())
119 .map(|s| s.to_string())
120 .unwrap_or_default();
121
122 let mut recent_panic = None;
123
124 if let Some(recent_panic_on) = recent_panic_on {
125 let crashed_at = match report.timestamp() {
126 Ok(t) => Some(t),
127 Err(e) => {
128 log::error!("Can't parse {}: {}", report.header.timestamp, e);
129 None
130 }
131 };
132 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
133 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
134 }
135 }
136
137 let description = report.description(recent_panic);
138 let summary = report.backtrace_summary();
139
140 tracing::error!(
141 service = "client",
142 version = %report.header.app_version,
143 os_version = %report.header.os_version,
144 bundle_id = %report.header.bundle_id,
145 incident_id = %report.header.incident_id,
146 installation_id = %installation_id,
147 description = %description,
148 backtrace = %summary,
149 "crash report"
150 );
151
152 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
153 let payload = slack::WebhookBody::new(|w| {
154 w.add_section(|s| s.text(slack::Text::markdown(description)))
155 .add_section(|s| {
156 s.add_field(slack::Text::markdown(format!(
157 "*Version:*\n{} ({})",
158 bundle_id, app_version
159 )))
160 .add_field({
161 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
162 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
163 hostname.strip_prefix("http://").unwrap_or_default()
164 });
165
166 slack::Text::markdown(format!(
167 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
168 CRASH_REPORTS_BUCKET,
169 hostname,
170 report.header.incident_id,
171 report
172 .header
173 .incident_id
174 .chars()
175 .take(8)
176 .collect::<String>(),
177 ))
178 })
179 })
180 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
181 });
182 let payload_json = serde_json::to_string(&payload).map_err(|err| {
183 log::error!("Failed to serialize payload to JSON: {err}");
184 Error::Internal(anyhow!(err))
185 })?;
186
187 reqwest::Client::new()
188 .post(slack_panics_webhook)
189 .header("Content-Type", "application/json")
190 .body(payload_json)
191 .send()
192 .await
193 .map_err(|err| {
194 log::error!("Failed to send payload to Slack: {err}");
195 Error::Internal(anyhow!(err))
196 })?;
197 }
198
199 Ok(())
200}
201
202pub async fn post_hang(
203 Extension(app): Extension<Arc<AppState>>,
204 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
205 body: Bytes,
206) -> Result<()> {
207 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
208 return Err(Error::http(
209 StatusCode::INTERNAL_SERVER_ERROR,
210 "events not enabled".into(),
211 ))?;
212 };
213
214 if checksum != expected {
215 return Err(Error::http(
216 StatusCode::BAD_REQUEST,
217 "invalid checksum".into(),
218 ))?;
219 }
220
221 let incident_id = Uuid::new_v4().to_string();
222
223 // dump JSON into S3 so we can get frame offsets if we need to.
224 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
225 blob_store_client
226 .put_object()
227 .bucket(CRASH_REPORTS_BUCKET)
228 .key(incident_id.clone() + ".hang.json")
229 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
230 .body(ByteStream::from(body.to_vec()))
231 .send()
232 .await
233 .map_err(|e| log::error!("Failed to upload crash: {}", e))
234 .ok();
235 }
236
237 let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
238 log::error!("can't parse report json: {err}");
239 Error::Internal(anyhow!(err))
240 })?;
241
242 let mut backtrace = "Possible hang detected on main thread:".to_string();
243 let unknown = "<unknown>".to_string();
244 for frame in report.backtrace.iter() {
245 backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
246 }
247
248 tracing::error!(
249 service = "client",
250 version = %report.app_version.unwrap_or_default().to_string(),
251 os_name = %report.os_name,
252 os_version = report.os_version.unwrap_or_default().to_string(),
253 incident_id = %incident_id,
254 installation_id = %report.installation_id.unwrap_or_default(),
255 backtrace = %backtrace,
256 "hang report");
257
258 Ok(())
259}
260
261pub async fn post_panic(
262 Extension(app): Extension<Arc<AppState>>,
263 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
264 body: Bytes,
265) -> Result<()> {
266 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
267 return Err(Error::http(
268 StatusCode::INTERNAL_SERVER_ERROR,
269 "events not enabled".into(),
270 ))?;
271 };
272
273 if checksum != expected {
274 return Err(Error::http(
275 StatusCode::BAD_REQUEST,
276 "invalid checksum".into(),
277 ))?;
278 }
279
280 let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
281 .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
282 let incident_id = uuid::Uuid::new_v4().to_string();
283 let panic = report.panic;
284
285 if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
286 return Err(Error::http(
287 StatusCode::BAD_REQUEST,
288 "invalid os version".into(),
289 ))?;
290 }
291
292 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
293 let response = blob_store_client
294 .head_object()
295 .bucket(CRASH_REPORTS_BUCKET)
296 .key(incident_id.clone() + ".json")
297 .send()
298 .await;
299
300 if response.is_ok() {
301 log::info!("We've already uploaded this crash");
302 return Ok(());
303 }
304
305 blob_store_client
306 .put_object()
307 .bucket(CRASH_REPORTS_BUCKET)
308 .key(incident_id.clone() + ".json")
309 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
310 .body(ByteStream::from(body.to_vec()))
311 .send()
312 .await
313 .map_err(|e| log::error!("Failed to upload crash: {}", e))
314 .ok();
315 }
316
317 tracing::error!(
318 service = "client",
319 version = %panic.app_version,
320 os_name = %panic.os_name,
321 os_version = %panic.os_version.clone().unwrap_or_default(),
322 incident_id = %incident_id,
323 installation_id = %panic.installation_id.clone().unwrap_or_default(),
324 description = %panic.payload,
325 backtrace = %panic.backtrace.join("\n"),
326 "panic report"
327 );
328
329 let backtrace = if panic.backtrace.len() > 25 {
330 let total = panic.backtrace.len();
331 format!(
332 "{}\n and {} more",
333 panic
334 .backtrace
335 .iter()
336 .take(20)
337 .cloned()
338 .collect::<Vec<_>>()
339 .join("\n"),
340 total - 20
341 )
342 } else {
343 panic.backtrace.join("\n")
344 };
345
346 if !report_to_slack(&panic) {
347 return Ok(());
348 }
349
350 let backtrace_with_summary = panic.payload + "\n" + &backtrace;
351
352 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
353 let payload = slack::WebhookBody::new(|w| {
354 w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
355 .add_section(|s| {
356 s.add_field(slack::Text::markdown(format!(
357 "*Version:*\n {} ",
358 panic.app_version
359 )))
360 .add_field({
361 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
362 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
363 hostname.strip_prefix("http://").unwrap_or_default()
364 });
365
366 slack::Text::markdown(format!(
367 "*{} {}:*\n<https://{}.{}/{}.json|{}…>",
368 panic.os_name,
369 panic.os_version.unwrap_or_default(),
370 CRASH_REPORTS_BUCKET,
371 hostname,
372 incident_id,
373 incident_id.chars().take(8).collect::<String>(),
374 ))
375 })
376 })
377 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
378 });
379 let payload_json = serde_json::to_string(&payload).map_err(|err| {
380 log::error!("Failed to serialize payload to JSON: {err}");
381 Error::Internal(anyhow!(err))
382 })?;
383
384 reqwest::Client::new()
385 .post(slack_panics_webhook)
386 .header("Content-Type", "application/json")
387 .body(payload_json)
388 .send()
389 .await
390 .map_err(|err| {
391 log::error!("Failed to send payload to Slack: {err}");
392 Error::Internal(anyhow!(err))
393 })?;
394 }
395
396 Ok(())
397}
398
399fn report_to_slack(panic: &Panic) -> bool {
400 // Panics on macOS should make their way to Slack as a crash report,
401 // so we don't need to send them a second time via this channel.
402 if panic.os_name == "macOS" {
403 return false;
404 }
405
406 if panic.payload.contains("ERROR_SURFACE_LOST_KHR") {
407 return false;
408 }
409
410 if panic.payload.contains("ERROR_INITIALIZATION_FAILED") {
411 return false;
412 }
413
414 if panic
415 .payload
416 .contains("GPU has crashed, and no debug information is available")
417 {
418 return false;
419 }
420
421 true
422}
423
424pub async fn post_events(
425 Extension(app): Extension<Arc<AppState>>,
426 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
427 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
428 body: Bytes,
429) -> Result<()> {
430 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
431 return Err(Error::http(
432 StatusCode::INTERNAL_SERVER_ERROR,
433 "events not enabled".into(),
434 ))?;
435 };
436
437 let checksum_matched = checksum == expected;
438
439 let request_body: telemetry_events::EventRequestBody =
440 serde_json::from_slice(&body).map_err(|err| {
441 log::error!("can't parse event json: {err}");
442 Error::Internal(anyhow!(err))
443 })?;
444
445 let Some(last_event) = request_body.events.last() else {
446 return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
447 };
448 let country_code = country_code_header.map(|h| h.to_string());
449
450 let first_event_at = chrono::Utc::now()
451 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
452
453 if let Some(kinesis_client) = app.kinesis_client.clone() {
454 if let Some(stream) = app.config.kinesis_stream.clone() {
455 let mut request = kinesis_client.put_records().stream_name(stream);
456 for row in for_snowflake(
457 request_body.clone(),
458 first_event_at,
459 country_code.clone(),
460 checksum_matched,
461 ) {
462 if let Some(data) = serde_json::to_vec(&row).log_err() {
463 request = request.records(
464 aws_sdk_kinesis::types::PutRecordsRequestEntry::builder()
465 .partition_key(request_body.system_id.clone().unwrap_or_default())
466 .data(data.into())
467 .build()
468 .unwrap(),
469 );
470 }
471 }
472 request.send().await.log_err();
473 }
474 };
475
476 Ok(())
477}
478
479pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
480 let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
481
482 let mut summer = Sha256::new();
483 summer.update(checksum_seed);
484 summer.update(json);
485 summer.update(checksum_seed);
486 Some(summer.finalize().into_iter().collect())
487}
488
489fn for_snowflake(
490 body: EventRequestBody,
491 first_event_at: chrono::DateTime<chrono::Utc>,
492 country_code: Option<String>,
493 checksum_matched: bool,
494) -> impl Iterator<Item = SnowflakeRow> {
495 body.events.into_iter().flat_map(move |event| {
496 let timestamp =
497 first_event_at + Duration::milliseconds(event.milliseconds_since_first_event);
498 // We will need to double check, but I believe all of the events that
499 // are being transformed here are now migrated over to use the
500 // telemetry::event! macro, as of this commit so this code can go away
501 // when we feel enough users have upgraded past this point.
502 let (event_type, mut event_properties) = match &event.event {
503 Event::Editor(e) => (
504 match e.operation.as_str() {
505 "open" => "Editor Opened".to_string(),
506 "save" => "Editor Saved".to_string(),
507 _ => format!("Unknown Editor Event: {}", e.operation),
508 },
509 serde_json::to_value(e).unwrap(),
510 ),
511 Event::InlineCompletion(e) => (
512 format!(
513 "Edit Prediction {}",
514 if e.suggestion_accepted {
515 "Accepted"
516 } else {
517 "Discarded"
518 }
519 ),
520 serde_json::to_value(e).unwrap(),
521 ),
522 Event::InlineCompletionRating(e) => (
523 "Edit Prediction Rated".to_string(),
524 serde_json::to_value(e).unwrap(),
525 ),
526 Event::Call(e) => {
527 let event_type = match e.operation.trim() {
528 "unshare project" => "Project Unshared".to_string(),
529 "open channel notes" => "Channel Notes Opened".to_string(),
530 "share project" => "Project Shared".to_string(),
531 "join channel" => "Channel Joined".to_string(),
532 "hang up" => "Call Ended".to_string(),
533 "accept incoming" => "Incoming Call Accepted".to_string(),
534 "invite" => "Participant Invited".to_string(),
535 "disable microphone" => "Microphone Disabled".to_string(),
536 "enable microphone" => "Microphone Enabled".to_string(),
537 "enable screen share" => "Screen Share Enabled".to_string(),
538 "disable screen share" => "Screen Share Disabled".to_string(),
539 "decline incoming" => "Incoming Call Declined".to_string(),
540 _ => format!("Unknown Call Event: {}", e.operation),
541 };
542
543 (event_type, serde_json::to_value(e).unwrap())
544 }
545 Event::Assistant(e) => (
546 match e.phase {
547 telemetry_events::AssistantPhase::Response => "Assistant Responded".to_string(),
548 telemetry_events::AssistantPhase::Invoked => "Assistant Invoked".to_string(),
549 telemetry_events::AssistantPhase::Accepted => {
550 "Assistant Response Accepted".to_string()
551 }
552 telemetry_events::AssistantPhase::Rejected => {
553 "Assistant Response Rejected".to_string()
554 }
555 },
556 serde_json::to_value(e).unwrap(),
557 ),
558 Event::Cpu(_) | Event::Memory(_) => return None,
559 Event::App(e) => {
560 let mut properties = json!({});
561 let event_type = match e.operation.trim() {
562 // App
563 "open" => "App Opened".to_string(),
564 "first open" => "App First Opened".to_string(),
565 "first open for release channel" => {
566 "App First Opened For Release Channel".to_string()
567 }
568 "close" => "App Closed".to_string(),
569
570 // Project
571 "open project" => "Project Opened".to_string(),
572 "open node project" => {
573 properties["project_type"] = json!("node");
574 "Project Opened".to_string()
575 }
576 "open pnpm project" => {
577 properties["project_type"] = json!("pnpm");
578 "Project Opened".to_string()
579 }
580 "open yarn project" => {
581 properties["project_type"] = json!("yarn");
582 "Project Opened".to_string()
583 }
584
585 // SSH
586 "create ssh server" => "SSH Server Created".to_string(),
587 "create ssh project" => "SSH Project Created".to_string(),
588 "open ssh project" => "SSH Project Opened".to_string(),
589
590 // Welcome Page
591 "welcome page: change keymap" => "Welcome Keymap Changed".to_string(),
592 "welcome page: change theme" => "Welcome Theme Changed".to_string(),
593 "welcome page: close" => "Welcome Page Closed".to_string(),
594 "welcome page: edit settings" => "Welcome Settings Edited".to_string(),
595 "welcome page: install cli" => "Welcome CLI Installed".to_string(),
596 "welcome page: open" => "Welcome Page Opened".to_string(),
597 "welcome page: open extensions" => "Welcome Extensions Page Opened".to_string(),
598 "welcome page: sign in to copilot" => "Welcome Copilot Signed In".to_string(),
599 "welcome page: toggle diagnostic telemetry" => {
600 "Welcome Diagnostic Telemetry Toggled".to_string()
601 }
602 "welcome page: toggle metric telemetry" => {
603 "Welcome Metric Telemetry Toggled".to_string()
604 }
605 "welcome page: toggle vim" => "Welcome Vim Mode Toggled".to_string(),
606 "welcome page: view docs" => "Welcome Documentation Viewed".to_string(),
607
608 // Extensions
609 "extensions page: open" => "Extensions Page Opened".to_string(),
610 "extensions: install extension" => "Extension Installed".to_string(),
611 "extensions: uninstall extension" => "Extension Uninstalled".to_string(),
612
613 // Misc
614 "markdown preview: open" => "Markdown Preview Opened".to_string(),
615 "project diagnostics: open" => "Project Diagnostics Opened".to_string(),
616 "project search: open" => "Project Search Opened".to_string(),
617 "repl sessions: open" => "REPL Session Started".to_string(),
618
619 // Feature Upsell
620 "feature upsell: toggle vim" => {
621 properties["source"] = json!("Feature Upsell");
622 "Vim Mode Toggled".to_string()
623 }
624 _ => e
625 .operation
626 .strip_prefix("feature upsell: viewed docs (")
627 .and_then(|s| s.strip_suffix(')'))
628 .map_or_else(
629 || format!("Unknown App Event: {}", e.operation),
630 |docs_url| {
631 properties["url"] = json!(docs_url);
632 properties["source"] = json!("Feature Upsell");
633 "Documentation Viewed".to_string()
634 },
635 ),
636 };
637 (event_type, properties)
638 }
639 Event::Setting(e) => (
640 "Settings Changed".to_string(),
641 serde_json::to_value(e).unwrap(),
642 ),
643 Event::Extension(e) => (
644 "Extension Loaded".to_string(),
645 serde_json::to_value(e).unwrap(),
646 ),
647 Event::Edit(e) => (
648 "Editor Edited".to_string(),
649 serde_json::to_value(e).unwrap(),
650 ),
651 Event::Action(e) => (
652 "Action Invoked".to_string(),
653 serde_json::to_value(e).unwrap(),
654 ),
655 Event::Repl(e) => (
656 "Kernel Status Changed".to_string(),
657 serde_json::to_value(e).unwrap(),
658 ),
659 Event::Flexible(e) => (
660 e.event_type.clone(),
661 serde_json::to_value(&e.event_properties).unwrap(),
662 ),
663 };
664
665 if let serde_json::Value::Object(ref mut map) = event_properties {
666 map.insert("app_version".to_string(), body.app_version.clone().into());
667 map.insert("os_name".to_string(), body.os_name.clone().into());
668 map.insert("os_version".to_string(), body.os_version.clone().into());
669 map.insert("architecture".to_string(), body.architecture.clone().into());
670 map.insert(
671 "release_channel".to_string(),
672 body.release_channel.clone().into(),
673 );
674 map.insert("signed_in".to_string(), event.signed_in.into());
675 map.insert("checksum_matched".to_string(), checksum_matched.into());
676 if let Some(country_code) = country_code.as_ref() {
677 map.insert("country".to_string(), country_code.clone().into());
678 }
679 }
680
681 // NOTE: most amplitude user properties are read out of our event_properties
682 // dictionary. See https://app.amplitude.com/data/zed/Zed/sources/detail/production/falcon%3A159998
683 // for how that is configured.
684 let user_properties = Some(serde_json::json!({
685 "is_staff": body.is_staff,
686 }));
687
688 Some(SnowflakeRow {
689 time: timestamp,
690 user_id: body.metrics_id.clone(),
691 device_id: body.system_id.clone(),
692 event_type,
693 event_properties,
694 user_properties,
695 insert_id: Some(Uuid::new_v4().to_string()),
696 })
697 })
698}
699
700#[derive(Serialize, Deserialize, Debug)]
701pub struct SnowflakeRow {
702 pub time: chrono::DateTime<chrono::Utc>,
703 pub user_id: Option<String>,
704 pub device_id: Option<String>,
705 pub event_type: String,
706 pub event_properties: serde_json::Value,
707 pub user_properties: Option<serde_json::Value>,
708 pub insert_id: Option<String>,
709}
710
711impl SnowflakeRow {
712 pub fn new(
713 event_type: impl Into<String>,
714 metrics_id: Uuid,
715 is_staff: bool,
716 system_id: Option<String>,
717 event_properties: serde_json::Value,
718 ) -> Self {
719 Self {
720 time: chrono::Utc::now(),
721 event_type: event_type.into(),
722 device_id: system_id,
723 user_id: Some(metrics_id.to_string()),
724 insert_id: Some(uuid::Uuid::new_v4().to_string()),
725 event_properties,
726 user_properties: Some(json!({"is_staff": is_staff})),
727 }
728 }
729
730 pub async fn write(
731 self,
732 client: &Option<aws_sdk_kinesis::Client>,
733 stream: &Option<String>,
734 ) -> anyhow::Result<()> {
735 let Some((client, stream)) = client.as_ref().zip(stream.as_ref()) else {
736 return Ok(());
737 };
738 let row = serde_json::to_vec(&self)?;
739 client
740 .put_record()
741 .stream_name(stream)
742 .partition_key(&self.user_id.unwrap_or_default())
743 .data(row.into())
744 .send()
745 .await?;
746 Ok(())
747 }
748}