events.rs

  1use super::ips_file::IpsFile;
  2use crate::api::CloudflareIpCountryHeader;
  3use crate::{AppState, Error, Result, api::slack};
  4use anyhow::anyhow;
  5use aws_sdk_s3::primitives::ByteStream;
  6use axum::{
  7    Extension, Router, TypedHeader,
  8    body::Bytes,
  9    headers::Header,
 10    http::{HeaderMap, HeaderName, StatusCode},
 11    routing::post,
 12};
 13use chrono::Duration;
 14use semantic_version::SemanticVersion;
 15use serde::{Deserialize, Serialize};
 16use serde_json::json;
 17use sha2::{Digest, Sha256};
 18use std::sync::{Arc, OnceLock};
 19use telemetry_events::{Event, EventRequestBody, Panic};
 20use util::ResultExt;
 21use uuid::Uuid;
 22
 23const CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
 24
 25pub fn router() -> Router {
 26    Router::new()
 27        .route("/telemetry/events", post(post_events))
 28        .route("/telemetry/crashes", post(post_crash))
 29        .route("/telemetry/panics", post(post_panic))
 30        .route("/telemetry/hangs", post(post_hang))
 31}
 32
 33pub struct ZedChecksumHeader(Vec<u8>);
 34
 35impl Header for ZedChecksumHeader {
 36    fn name() -> &'static HeaderName {
 37        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
 38        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
 39    }
 40
 41    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 42    where
 43        Self: Sized,
 44        I: Iterator<Item = &'i axum::http::HeaderValue>,
 45    {
 46        let checksum = values
 47            .next()
 48            .ok_or_else(axum::headers::Error::invalid)?
 49            .to_str()
 50            .map_err(|_| axum::headers::Error::invalid())?;
 51
 52        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
 53        Ok(Self(bytes))
 54    }
 55
 56    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 57        unimplemented!()
 58    }
 59}
 60
 61pub async fn post_crash(
 62    Extension(app): Extension<Arc<AppState>>,
 63    headers: HeaderMap,
 64    body: Bytes,
 65) -> Result<()> {
 66    let report = IpsFile::parse(&body)?;
 67    let version_threshold = SemanticVersion::new(0, 123, 0);
 68
 69    let bundle_id = &report.header.bundle_id;
 70    let app_version = &report.app_version();
 71
 72    if bundle_id == "dev.zed.Zed-Dev" {
 73        log::error!("Crash uploads from {} are ignored.", bundle_id);
 74        return Ok(());
 75    }
 76
 77    if app_version.is_none() || app_version.unwrap() < version_threshold {
 78        log::error!(
 79            "Crash uploads from {} are ignored.",
 80            report.header.app_version
 81        );
 82        return Ok(());
 83    }
 84    let app_version = app_version.unwrap();
 85
 86    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 87        let response = blob_store_client
 88            .head_object()
 89            .bucket(CRASH_REPORTS_BUCKET)
 90            .key(report.header.incident_id.clone() + ".ips")
 91            .send()
 92            .await;
 93
 94        if response.is_ok() {
 95            log::info!("We've already uploaded this crash");
 96            return Ok(());
 97        }
 98
 99        blob_store_client
100            .put_object()
101            .bucket(CRASH_REPORTS_BUCKET)
102            .key(report.header.incident_id.clone() + ".ips")
103            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
104            .body(ByteStream::from(body.to_vec()))
105            .send()
106            .await
107            .map_err(|e| log::error!("Failed to upload crash: {}", e))
108            .ok();
109    }
110
111    let recent_panic_on: Option<i64> = headers
112        .get("x-zed-panicked-on")
113        .and_then(|h| h.to_str().ok())
114        .and_then(|s| s.parse().ok());
115
116    let installation_id = headers
117        .get("x-zed-installation-id")
118        .and_then(|h| h.to_str().ok())
119        .map(|s| s.to_string())
120        .unwrap_or_default();
121
122    let mut recent_panic = None;
123
124    if let Some(recent_panic_on) = recent_panic_on {
125        let crashed_at = match report.timestamp() {
126            Ok(t) => Some(t),
127            Err(e) => {
128                log::error!("Can't parse {}: {}", report.header.timestamp, e);
129                None
130            }
131        };
132        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
133            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
134        }
135    }
136
137    let description = report.description(recent_panic);
138    let summary = report.backtrace_summary();
139
140    tracing::error!(
141        service = "client",
142        version = %report.header.app_version,
143        os_version = %report.header.os_version,
144        bundle_id = %report.header.bundle_id,
145        incident_id = %report.header.incident_id,
146        installation_id = %installation_id,
147        description = %description,
148        backtrace = %summary,
149        "crash report"
150    );
151
152    if let Some(kinesis_client) = app.kinesis_client.clone() {
153        if let Some(stream) = app.config.kinesis_stream.clone() {
154            let properties = json!({
155                "app_version": report.header.app_version,
156                "os_version": report.header.os_version,
157                "os_name": "macOS",
158                "bundle_id": report.header.bundle_id,
159                "incident_id": report.header.incident_id,
160                "installation_id": installation_id,
161                "description": description,
162                "backtrace": summary,
163            });
164            let row = SnowflakeRow::new(
165                "Crash Reported",
166                None,
167                false,
168                Some(installation_id),
169                properties,
170            );
171            let data = serde_json::to_vec(&row)?;
172            kinesis_client
173                .put_record()
174                .stream_name(stream)
175                .partition_key(row.insert_id.unwrap_or_default())
176                .data(data.into())
177                .send()
178                .await
179                .log_err();
180        }
181    }
182
183    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
184        let payload = slack::WebhookBody::new(|w| {
185            w.add_section(|s| s.text(slack::Text::markdown(description)))
186                .add_section(|s| {
187                    s.add_field(slack::Text::markdown(format!(
188                        "*Version:*\n{} ({})",
189                        bundle_id, app_version
190                    )))
191                    .add_field({
192                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
193                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
194                            hostname.strip_prefix("http://").unwrap_or_default()
195                        });
196
197                        slack::Text::markdown(format!(
198                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
199                            CRASH_REPORTS_BUCKET,
200                            hostname,
201                            report.header.incident_id,
202                            report
203                                .header
204                                .incident_id
205                                .chars()
206                                .take(8)
207                                .collect::<String>(),
208                        ))
209                    })
210                })
211                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
212        });
213        let payload_json = serde_json::to_string(&payload).map_err(|err| {
214            log::error!("Failed to serialize payload to JSON: {err}");
215            Error::Internal(anyhow!(err))
216        })?;
217
218        reqwest::Client::new()
219            .post(slack_panics_webhook)
220            .header("Content-Type", "application/json")
221            .body(payload_json)
222            .send()
223            .await
224            .map_err(|err| {
225                log::error!("Failed to send payload to Slack: {err}");
226                Error::Internal(anyhow!(err))
227            })?;
228    }
229
230    Ok(())
231}
232
233pub async fn post_hang(
234    Extension(app): Extension<Arc<AppState>>,
235    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
236    body: Bytes,
237) -> Result<()> {
238    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
239        return Err(Error::http(
240            StatusCode::INTERNAL_SERVER_ERROR,
241            "events not enabled".into(),
242        ))?;
243    };
244
245    if checksum != expected {
246        return Err(Error::http(
247            StatusCode::BAD_REQUEST,
248            "invalid checksum".into(),
249        ))?;
250    }
251
252    let incident_id = Uuid::new_v4().to_string();
253
254    // dump JSON into S3 so we can get frame offsets if we need to.
255    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
256        blob_store_client
257            .put_object()
258            .bucket(CRASH_REPORTS_BUCKET)
259            .key(incident_id.clone() + ".hang.json")
260            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
261            .body(ByteStream::from(body.to_vec()))
262            .send()
263            .await
264            .map_err(|e| log::error!("Failed to upload crash: {}", e))
265            .ok();
266    }
267
268    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
269        log::error!("can't parse report json: {err}");
270        Error::Internal(anyhow!(err))
271    })?;
272
273    let mut backtrace = "Possible hang detected on main thread:".to_string();
274    let unknown = "<unknown>".to_string();
275    for frame in report.backtrace.iter() {
276        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
277    }
278
279    tracing::error!(
280        service = "client",
281        version = %report.app_version.unwrap_or_default().to_string(),
282        os_name = %report.os_name,
283        os_version = report.os_version.unwrap_or_default().to_string(),
284        incident_id = %incident_id,
285        installation_id = %report.installation_id.unwrap_or_default(),
286        backtrace = %backtrace,
287        "hang report");
288
289    Ok(())
290}
291
292pub async fn post_panic(
293    Extension(app): Extension<Arc<AppState>>,
294    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
295    body: Bytes,
296) -> Result<()> {
297    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
298        return Err(Error::http(
299            StatusCode::INTERNAL_SERVER_ERROR,
300            "events not enabled".into(),
301        ))?;
302    };
303
304    if checksum != expected {
305        return Err(Error::http(
306            StatusCode::BAD_REQUEST,
307            "invalid checksum".into(),
308        ))?;
309    }
310
311    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
312        .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
313    let incident_id = uuid::Uuid::new_v4().to_string();
314    let panic = report.panic;
315
316    if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
317        return Err(Error::http(
318            StatusCode::BAD_REQUEST,
319            "invalid os version".into(),
320        ))?;
321    }
322
323    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
324        let response = blob_store_client
325            .head_object()
326            .bucket(CRASH_REPORTS_BUCKET)
327            .key(incident_id.clone() + ".json")
328            .send()
329            .await;
330
331        if response.is_ok() {
332            log::info!("We've already uploaded this crash");
333            return Ok(());
334        }
335
336        blob_store_client
337            .put_object()
338            .bucket(CRASH_REPORTS_BUCKET)
339            .key(incident_id.clone() + ".json")
340            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
341            .body(ByteStream::from(body.to_vec()))
342            .send()
343            .await
344            .map_err(|e| log::error!("Failed to upload crash: {}", e))
345            .ok();
346    }
347
348    let backtrace = panic.backtrace.join("\n");
349
350    tracing::error!(
351        service = "client",
352        version = %panic.app_version,
353        os_name = %panic.os_name,
354        os_version = %panic.os_version.clone().unwrap_or_default(),
355        incident_id = %incident_id,
356        installation_id = %panic.installation_id.clone().unwrap_or_default(),
357        description = %panic.payload,
358        backtrace = %backtrace,
359        "panic report"
360    );
361
362    if let Some(kinesis_client) = app.kinesis_client.clone() {
363        if let Some(stream) = app.config.kinesis_stream.clone() {
364            let properties = json!({
365                "app_version": panic.app_version,
366                "os_name": panic.os_name,
367                "os_version": panic.os_version,
368                "incident_id": incident_id,
369                "installation_id": panic.installation_id,
370                "description": panic.payload,
371                "backtrace": backtrace,
372            });
373            let row = SnowflakeRow::new(
374                "Panic Reported",
375                None,
376                false,
377                panic.installation_id.clone(),
378                properties,
379            );
380            let data = serde_json::to_vec(&row)?;
381            kinesis_client
382                .put_record()
383                .stream_name(stream)
384                .partition_key(row.insert_id.unwrap_or_default())
385                .data(data.into())
386                .send()
387                .await
388                .log_err();
389        }
390    }
391
392    let backtrace = if panic.backtrace.len() > 25 {
393        let total = panic.backtrace.len();
394        format!(
395            "{}\n   and {} more",
396            panic
397                .backtrace
398                .iter()
399                .take(20)
400                .cloned()
401                .collect::<Vec<_>>()
402                .join("\n"),
403            total - 20
404        )
405    } else {
406        panic.backtrace.join("\n")
407    };
408
409    if !report_to_slack(&panic) {
410        return Ok(());
411    }
412
413    let backtrace_with_summary = panic.payload + "\n" + &backtrace;
414
415    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
416        let payload = slack::WebhookBody::new(|w| {
417            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
418                .add_section(|s| {
419                    s.add_field(slack::Text::markdown(format!(
420                        "*Version:*\n {} ",
421                        panic.app_version
422                    )))
423                    .add_field({
424                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
425                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
426                            hostname.strip_prefix("http://").unwrap_or_default()
427                        });
428
429                        slack::Text::markdown(format!(
430                            "*{} {}:*\n<https://{}.{}/{}.json|{}…>",
431                            panic.os_name,
432                            panic.os_version.unwrap_or_default(),
433                            CRASH_REPORTS_BUCKET,
434                            hostname,
435                            incident_id,
436                            incident_id.chars().take(8).collect::<String>(),
437                        ))
438                    })
439                })
440                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
441        });
442        let payload_json = serde_json::to_string(&payload).map_err(|err| {
443            log::error!("Failed to serialize payload to JSON: {err}");
444            Error::Internal(anyhow!(err))
445        })?;
446
447        reqwest::Client::new()
448            .post(slack_panics_webhook)
449            .header("Content-Type", "application/json")
450            .body(payload_json)
451            .send()
452            .await
453            .map_err(|err| {
454                log::error!("Failed to send payload to Slack: {err}");
455                Error::Internal(anyhow!(err))
456            })?;
457    }
458
459    Ok(())
460}
461
462fn report_to_slack(panic: &Panic) -> bool {
463    // Panics on macOS should make their way to Slack as a crash report,
464    // so we don't need to send them a second time via this channel.
465    if panic.os_name == "macOS" {
466        return false;
467    }
468
469    if panic.payload.contains("ERROR_SURFACE_LOST_KHR") {
470        return false;
471    }
472
473    if panic.payload.contains("ERROR_INITIALIZATION_FAILED") {
474        return false;
475    }
476
477    if panic
478        .payload
479        .contains("GPU has crashed, and no debug information is available")
480    {
481        return false;
482    }
483
484    true
485}
486
487pub async fn post_events(
488    Extension(app): Extension<Arc<AppState>>,
489    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
490    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
491    body: Bytes,
492) -> Result<()> {
493    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
494        return Err(Error::http(
495            StatusCode::INTERNAL_SERVER_ERROR,
496            "events not enabled".into(),
497        ))?;
498    };
499
500    let checksum_matched = checksum == expected;
501
502    let request_body: telemetry_events::EventRequestBody =
503        serde_json::from_slice(&body).map_err(|err| {
504            log::error!("can't parse event json: {err}");
505            Error::Internal(anyhow!(err))
506        })?;
507
508    let Some(last_event) = request_body.events.last() else {
509        return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
510    };
511    let country_code = country_code_header.map(|h| h.to_string());
512
513    let first_event_at = chrono::Utc::now()
514        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
515
516    if let Some(kinesis_client) = app.kinesis_client.clone() {
517        if let Some(stream) = app.config.kinesis_stream.clone() {
518            let mut request = kinesis_client.put_records().stream_name(stream);
519            let mut has_records = false;
520            for row in for_snowflake(
521                request_body.clone(),
522                first_event_at,
523                country_code.clone(),
524                checksum_matched,
525            ) {
526                if let Some(data) = serde_json::to_vec(&row).log_err() {
527                    request = request.records(
528                        aws_sdk_kinesis::types::PutRecordsRequestEntry::builder()
529                            .partition_key(request_body.system_id.clone().unwrap_or_default())
530                            .data(data.into())
531                            .build()
532                            .unwrap(),
533                    );
534                    has_records = true;
535                }
536            }
537            if has_records {
538                request.send().await.log_err();
539            }
540        }
541    };
542
543    Ok(())
544}
545
546pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
547    let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
548
549    let mut summer = Sha256::new();
550    summer.update(checksum_seed);
551    summer.update(json);
552    summer.update(checksum_seed);
553    Some(summer.finalize().into_iter().collect())
554}
555
556fn for_snowflake(
557    body: EventRequestBody,
558    first_event_at: chrono::DateTime<chrono::Utc>,
559    country_code: Option<String>,
560    checksum_matched: bool,
561) -> impl Iterator<Item = SnowflakeRow> {
562    body.events.into_iter().filter_map(move |event| {
563        let timestamp =
564            first_event_at + Duration::milliseconds(event.milliseconds_since_first_event);
565        // We will need to double check, but I believe all of the events that
566        // are being transformed here are now migrated over to use the
567        // telemetry::event! macro, as of this commit so this code can go away
568        // when we feel enough users have upgraded past this point.
569        let (event_type, mut event_properties) = match &event.event {
570            Event::Editor(e) => (
571                match e.operation.as_str() {
572                    "open" => "Editor Opened".to_string(),
573                    "save" => "Editor Saved".to_string(),
574                    _ => format!("Unknown Editor Event: {}", e.operation),
575                },
576                serde_json::to_value(e).unwrap(),
577            ),
578            Event::InlineCompletion(e) => (
579                format!(
580                    "Edit Prediction {}",
581                    if e.suggestion_accepted {
582                        "Accepted"
583                    } else {
584                        "Discarded"
585                    }
586                ),
587                serde_json::to_value(e).unwrap(),
588            ),
589            Event::InlineCompletionRating(e) => (
590                "Edit Prediction Rated".to_string(),
591                serde_json::to_value(e).unwrap(),
592            ),
593            Event::Call(e) => {
594                let event_type = match e.operation.trim() {
595                    "unshare project" => "Project Unshared".to_string(),
596                    "open channel notes" => "Channel Notes Opened".to_string(),
597                    "share project" => "Project Shared".to_string(),
598                    "join channel" => "Channel Joined".to_string(),
599                    "hang up" => "Call Ended".to_string(),
600                    "accept incoming" => "Incoming Call Accepted".to_string(),
601                    "invite" => "Participant Invited".to_string(),
602                    "disable microphone" => "Microphone Disabled".to_string(),
603                    "enable microphone" => "Microphone Enabled".to_string(),
604                    "enable screen share" => "Screen Share Enabled".to_string(),
605                    "disable screen share" => "Screen Share Disabled".to_string(),
606                    "decline incoming" => "Incoming Call Declined".to_string(),
607                    _ => format!("Unknown Call Event: {}", e.operation),
608                };
609
610                (event_type, serde_json::to_value(e).unwrap())
611            }
612            Event::Assistant(e) => (
613                match e.phase {
614                    telemetry_events::AssistantPhase::Response => "Assistant Responded".to_string(),
615                    telemetry_events::AssistantPhase::Invoked => "Assistant Invoked".to_string(),
616                    telemetry_events::AssistantPhase::Accepted => {
617                        "Assistant Response Accepted".to_string()
618                    }
619                    telemetry_events::AssistantPhase::Rejected => {
620                        "Assistant Response Rejected".to_string()
621                    }
622                },
623                serde_json::to_value(e).unwrap(),
624            ),
625            Event::Cpu(_) | Event::Memory(_) => return None,
626            Event::App(e) => {
627                let mut properties = json!({});
628                let event_type = match e.operation.trim() {
629                    // App
630                    "open" => "App Opened".to_string(),
631                    "first open" => "App First Opened".to_string(),
632                    "first open for release channel" => {
633                        "App First Opened For Release Channel".to_string()
634                    }
635                    "close" => "App Closed".to_string(),
636
637                    // Project
638                    "open project" => "Project Opened".to_string(),
639                    "open node project" => {
640                        properties["project_type"] = json!("node");
641                        "Project Opened".to_string()
642                    }
643                    "open pnpm project" => {
644                        properties["project_type"] = json!("pnpm");
645                        "Project Opened".to_string()
646                    }
647                    "open yarn project" => {
648                        properties["project_type"] = json!("yarn");
649                        "Project Opened".to_string()
650                    }
651
652                    // SSH
653                    "create ssh server" => "SSH Server Created".to_string(),
654                    "create ssh project" => "SSH Project Created".to_string(),
655                    "open ssh project" => "SSH Project Opened".to_string(),
656
657                    // Welcome Page
658                    "welcome page: change keymap" => "Welcome Keymap Changed".to_string(),
659                    "welcome page: change theme" => "Welcome Theme Changed".to_string(),
660                    "welcome page: close" => "Welcome Page Closed".to_string(),
661                    "welcome page: edit settings" => "Welcome Settings Edited".to_string(),
662                    "welcome page: install cli" => "Welcome CLI Installed".to_string(),
663                    "welcome page: open" => "Welcome Page Opened".to_string(),
664                    "welcome page: open extensions" => "Welcome Extensions Page Opened".to_string(),
665                    "welcome page: sign in to copilot" => "Welcome Copilot Signed In".to_string(),
666                    "welcome page: toggle diagnostic telemetry" => {
667                        "Welcome Diagnostic Telemetry Toggled".to_string()
668                    }
669                    "welcome page: toggle metric telemetry" => {
670                        "Welcome Metric Telemetry Toggled".to_string()
671                    }
672                    "welcome page: toggle vim" => "Welcome Vim Mode Toggled".to_string(),
673                    "welcome page: view docs" => "Welcome Documentation Viewed".to_string(),
674
675                    // Extensions
676                    "extensions page: open" => "Extensions Page Opened".to_string(),
677                    "extensions: install extension" => "Extension Installed".to_string(),
678                    "extensions: uninstall extension" => "Extension Uninstalled".to_string(),
679
680                    // Misc
681                    "markdown preview: open" => "Markdown Preview Opened".to_string(),
682                    "project diagnostics: open" => "Project Diagnostics Opened".to_string(),
683                    "project search: open" => "Project Search Opened".to_string(),
684                    "repl sessions: open" => "REPL Session Started".to_string(),
685
686                    // Feature Upsell
687                    "feature upsell: toggle vim" => {
688                        properties["source"] = json!("Feature Upsell");
689                        "Vim Mode Toggled".to_string()
690                    }
691                    _ => e
692                        .operation
693                        .strip_prefix("feature upsell: viewed docs (")
694                        .and_then(|s| s.strip_suffix(')'))
695                        .map_or_else(
696                            || format!("Unknown App Event: {}", e.operation),
697                            |docs_url| {
698                                properties["url"] = json!(docs_url);
699                                properties["source"] = json!("Feature Upsell");
700                                "Documentation Viewed".to_string()
701                            },
702                        ),
703                };
704                (event_type, properties)
705            }
706            Event::Setting(e) => (
707                "Settings Changed".to_string(),
708                serde_json::to_value(e).unwrap(),
709            ),
710            Event::Extension(e) => (
711                "Extension Loaded".to_string(),
712                serde_json::to_value(e).unwrap(),
713            ),
714            Event::Edit(e) => (
715                "Editor Edited".to_string(),
716                serde_json::to_value(e).unwrap(),
717            ),
718            Event::Action(e) => (
719                "Action Invoked".to_string(),
720                serde_json::to_value(e).unwrap(),
721            ),
722            Event::Repl(e) => (
723                "Kernel Status Changed".to_string(),
724                serde_json::to_value(e).unwrap(),
725            ),
726            Event::Flexible(e) => (
727                e.event_type.clone(),
728                serde_json::to_value(&e.event_properties).unwrap(),
729            ),
730        };
731
732        if let serde_json::Value::Object(ref mut map) = event_properties {
733            map.insert("app_version".to_string(), body.app_version.clone().into());
734            map.insert("os_name".to_string(), body.os_name.clone().into());
735            map.insert("os_version".to_string(), body.os_version.clone().into());
736            map.insert("architecture".to_string(), body.architecture.clone().into());
737            map.insert(
738                "release_channel".to_string(),
739                body.release_channel.clone().into(),
740            );
741            map.insert("signed_in".to_string(), event.signed_in.into());
742            map.insert("checksum_matched".to_string(), checksum_matched.into());
743            if let Some(country_code) = country_code.as_ref() {
744                map.insert("country".to_string(), country_code.clone().into());
745            }
746        }
747
748        // NOTE: most amplitude user properties are read out of our event_properties
749        // dictionary. See https://app.amplitude.com/data/zed/Zed/sources/detail/production/falcon%3A159998
750        // for how that is configured.
751        let user_properties = body.is_staff.map(|is_staff| {
752            serde_json::json!({
753                "is_staff": is_staff,
754            })
755        });
756
757        Some(SnowflakeRow {
758            time: timestamp,
759            user_id: body.metrics_id.clone(),
760            device_id: body.system_id.clone(),
761            event_type,
762            event_properties,
763            user_properties,
764            insert_id: Some(Uuid::new_v4().to_string()),
765        })
766    })
767}
768
769#[derive(Serialize, Deserialize, Debug)]
770pub struct SnowflakeRow {
771    pub time: chrono::DateTime<chrono::Utc>,
772    pub user_id: Option<String>,
773    pub device_id: Option<String>,
774    pub event_type: String,
775    pub event_properties: serde_json::Value,
776    pub user_properties: Option<serde_json::Value>,
777    pub insert_id: Option<String>,
778}
779
780impl SnowflakeRow {
781    pub fn new(
782        event_type: impl Into<String>,
783        metrics_id: Option<Uuid>,
784        is_staff: bool,
785        system_id: Option<String>,
786        event_properties: serde_json::Value,
787    ) -> Self {
788        Self {
789            time: chrono::Utc::now(),
790            event_type: event_type.into(),
791            device_id: system_id,
792            user_id: metrics_id.map(|id| id.to_string()),
793            insert_id: Some(uuid::Uuid::new_v4().to_string()),
794            event_properties,
795            user_properties: Some(json!({"is_staff": is_staff})),
796        }
797    }
798
799    pub async fn write(
800        self,
801        client: &Option<aws_sdk_kinesis::Client>,
802        stream: &Option<String>,
803    ) -> anyhow::Result<()> {
804        let Some((client, stream)) = client.as_ref().zip(stream.as_ref()) else {
805            return Ok(());
806        };
807        let row = serde_json::to_vec(&self)?;
808        client
809            .put_record()
810            .stream_name(stream)
811            .partition_key(&self.user_id.unwrap_or_default())
812            .data(row.into())
813            .send()
814            .await?;
815        Ok(())
816    }
817}