events.rs

   1use super::ips_file::IpsFile;
   2use crate::api::CloudflareIpCountryHeader;
   3use crate::clickhouse::write_to_table;
   4use crate::{api::slack, AppState, Error, Result};
   5use anyhow::{anyhow, Context};
   6use aws_sdk_s3::primitives::ByteStream;
   7use axum::{
   8    body::Bytes,
   9    headers::Header,
  10    http::{HeaderMap, HeaderName, StatusCode},
  11    routing::post,
  12    Extension, Router, TypedHeader,
  13};
  14use chrono::Duration;
  15use rpc::ExtensionMetadata;
  16use semantic_version::SemanticVersion;
  17use serde::{Deserialize, Serialize, Serializer};
  18use serde_json::json;
  19use sha2::{Digest, Sha256};
  20use std::sync::{Arc, OnceLock};
  21use telemetry_events::{
  22    ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
  23    EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, Panic,
  24    ReplEvent, SettingEvent,
  25};
  26use util::ResultExt;
  27use uuid::Uuid;
  28
  29const CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  30
  31pub fn router() -> Router {
  32    Router::new()
  33        .route("/telemetry/events", post(post_events))
  34        .route("/telemetry/crashes", post(post_crash))
  35        .route("/telemetry/panics", post(post_panic))
  36        .route("/telemetry/hangs", post(post_hang))
  37}
  38
  39pub struct ZedChecksumHeader(Vec<u8>);
  40
  41impl Header for ZedChecksumHeader {
  42    fn name() -> &'static HeaderName {
  43        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  44        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  45    }
  46
  47    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  48    where
  49        Self: Sized,
  50        I: Iterator<Item = &'i axum::http::HeaderValue>,
  51    {
  52        let checksum = values
  53            .next()
  54            .ok_or_else(axum::headers::Error::invalid)?
  55            .to_str()
  56            .map_err(|_| axum::headers::Error::invalid())?;
  57
  58        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  59        Ok(Self(bytes))
  60    }
  61
  62    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  63        unimplemented!()
  64    }
  65}
  66
  67pub async fn post_crash(
  68    Extension(app): Extension<Arc<AppState>>,
  69    headers: HeaderMap,
  70    body: Bytes,
  71) -> Result<()> {
  72    let report = IpsFile::parse(&body)?;
  73    let version_threshold = SemanticVersion::new(0, 123, 0);
  74
  75    let bundle_id = &report.header.bundle_id;
  76    let app_version = &report.app_version();
  77
  78    if bundle_id == "dev.zed.Zed-Dev" {
  79        log::error!("Crash uploads from {} are ignored.", bundle_id);
  80        return Ok(());
  81    }
  82
  83    if app_version.is_none() || app_version.unwrap() < version_threshold {
  84        log::error!(
  85            "Crash uploads from {} are ignored.",
  86            report.header.app_version
  87        );
  88        return Ok(());
  89    }
  90    let app_version = app_version.unwrap();
  91
  92    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
  93        let response = blob_store_client
  94            .head_object()
  95            .bucket(CRASH_REPORTS_BUCKET)
  96            .key(report.header.incident_id.clone() + ".ips")
  97            .send()
  98            .await;
  99
 100        if response.is_ok() {
 101            log::info!("We've already uploaded this crash");
 102            return Ok(());
 103        }
 104
 105        blob_store_client
 106            .put_object()
 107            .bucket(CRASH_REPORTS_BUCKET)
 108            .key(report.header.incident_id.clone() + ".ips")
 109            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 110            .body(ByteStream::from(body.to_vec()))
 111            .send()
 112            .await
 113            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 114            .ok();
 115    }
 116
 117    let recent_panic_on: Option<i64> = headers
 118        .get("x-zed-panicked-on")
 119        .and_then(|h| h.to_str().ok())
 120        .and_then(|s| s.parse().ok());
 121
 122    let installation_id = headers
 123        .get("x-zed-installation-id")
 124        .and_then(|h| h.to_str().ok())
 125        .map(|s| s.to_string())
 126        .unwrap_or_default();
 127
 128    let mut recent_panic = None;
 129
 130    if let Some(recent_panic_on) = recent_panic_on {
 131        let crashed_at = match report.timestamp() {
 132            Ok(t) => Some(t),
 133            Err(e) => {
 134                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 135                None
 136            }
 137        };
 138        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 139            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 140        }
 141    }
 142
 143    let description = report.description(recent_panic);
 144    let summary = report.backtrace_summary();
 145
 146    tracing::error!(
 147        service = "client",
 148        version = %report.header.app_version,
 149        os_version = %report.header.os_version,
 150        bundle_id = %report.header.bundle_id,
 151        incident_id = %report.header.incident_id,
 152        installation_id = %installation_id,
 153        description = %description,
 154        backtrace = %summary,
 155        "crash report"
 156    );
 157
 158    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 159        let payload = slack::WebhookBody::new(|w| {
 160            w.add_section(|s| s.text(slack::Text::markdown(description)))
 161                .add_section(|s| {
 162                    s.add_field(slack::Text::markdown(format!(
 163                        "*Version:*\n{} ({})",
 164                        bundle_id, app_version
 165                    )))
 166                    .add_field({
 167                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 168                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 169                            hostname.strip_prefix("http://").unwrap_or_default()
 170                        });
 171
 172                        slack::Text::markdown(format!(
 173                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 174                            CRASH_REPORTS_BUCKET,
 175                            hostname,
 176                            report.header.incident_id,
 177                            report
 178                                .header
 179                                .incident_id
 180                                .chars()
 181                                .take(8)
 182                                .collect::<String>(),
 183                        ))
 184                    })
 185                })
 186                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 187        });
 188        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 189            log::error!("Failed to serialize payload to JSON: {err}");
 190            Error::Internal(anyhow!(err))
 191        })?;
 192
 193        reqwest::Client::new()
 194            .post(slack_panics_webhook)
 195            .header("Content-Type", "application/json")
 196            .body(payload_json)
 197            .send()
 198            .await
 199            .map_err(|err| {
 200                log::error!("Failed to send payload to Slack: {err}");
 201                Error::Internal(anyhow!(err))
 202            })?;
 203    }
 204
 205    Ok(())
 206}
 207
 208pub async fn post_hang(
 209    Extension(app): Extension<Arc<AppState>>,
 210    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 211    body: Bytes,
 212) -> Result<()> {
 213    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 214        return Err(Error::http(
 215            StatusCode::INTERNAL_SERVER_ERROR,
 216            "events not enabled".into(),
 217        ))?;
 218    };
 219
 220    if checksum != expected {
 221        return Err(Error::http(
 222            StatusCode::BAD_REQUEST,
 223            "invalid checksum".into(),
 224        ))?;
 225    }
 226
 227    let incident_id = Uuid::new_v4().to_string();
 228
 229    // dump JSON into S3 so we can get frame offsets if we need to.
 230    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 231        blob_store_client
 232            .put_object()
 233            .bucket(CRASH_REPORTS_BUCKET)
 234            .key(incident_id.clone() + ".hang.json")
 235            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 236            .body(ByteStream::from(body.to_vec()))
 237            .send()
 238            .await
 239            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 240            .ok();
 241    }
 242
 243    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
 244        log::error!("can't parse report json: {err}");
 245        Error::Internal(anyhow!(err))
 246    })?;
 247
 248    let mut backtrace = "Possible hang detected on main thread:".to_string();
 249    let unknown = "<unknown>".to_string();
 250    for frame in report.backtrace.iter() {
 251        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
 252    }
 253
 254    tracing::error!(
 255        service = "client",
 256        version = %report.app_version.unwrap_or_default().to_string(),
 257        os_name = %report.os_name,
 258        os_version = report.os_version.unwrap_or_default().to_string(),
 259        incident_id = %incident_id,
 260        installation_id = %report.installation_id.unwrap_or_default(),
 261        backtrace = %backtrace,
 262        "hang report");
 263
 264    Ok(())
 265}
 266
 267pub async fn post_panic(
 268    Extension(app): Extension<Arc<AppState>>,
 269    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 270    body: Bytes,
 271) -> Result<()> {
 272    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 273        return Err(Error::http(
 274            StatusCode::INTERNAL_SERVER_ERROR,
 275            "events not enabled".into(),
 276        ))?;
 277    };
 278
 279    if checksum != expected {
 280        return Err(Error::http(
 281            StatusCode::BAD_REQUEST,
 282            "invalid checksum".into(),
 283        ))?;
 284    }
 285
 286    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
 287        .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
 288    let panic = report.panic;
 289
 290    if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
 291        return Err(Error::http(
 292            StatusCode::BAD_REQUEST,
 293            "invalid os version".into(),
 294        ))?;
 295    }
 296
 297    tracing::error!(
 298        service = "client",
 299        version = %panic.app_version,
 300        os_name = %panic.os_name,
 301        os_version = %panic.os_version.clone().unwrap_or_default(),
 302        installation_id = %panic.installation_id.clone().unwrap_or_default(),
 303        description = %panic.payload,
 304        backtrace = %panic.backtrace.join("\n"),
 305        "panic report"
 306    );
 307
 308    let backtrace = if panic.backtrace.len() > 25 {
 309        let total = panic.backtrace.len();
 310        format!(
 311            "{}\n   and {} more",
 312            panic
 313                .backtrace
 314                .iter()
 315                .take(20)
 316                .cloned()
 317                .collect::<Vec<_>>()
 318                .join("\n"),
 319            total - 20
 320        )
 321    } else {
 322        panic.backtrace.join("\n")
 323    };
 324
 325    if !report_to_slack(&panic) {
 326        return Ok(());
 327    }
 328
 329    let backtrace_with_summary = panic.payload + "\n" + &backtrace;
 330
 331    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 332        let payload = slack::WebhookBody::new(|w| {
 333            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
 334                .add_section(|s| {
 335                    s.add_field(slack::Text::markdown(format!(
 336                        "*Version:*\n {} ",
 337                        panic.app_version
 338                    )))
 339                    .add_field({
 340                        slack::Text::markdown(format!(
 341                            "*OS:*\n{} {}",
 342                            panic.os_name,
 343                            panic.os_version.unwrap_or_default()
 344                        ))
 345                    })
 346                })
 347                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
 348        });
 349        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 350            log::error!("Failed to serialize payload to JSON: {err}");
 351            Error::Internal(anyhow!(err))
 352        })?;
 353
 354        reqwest::Client::new()
 355            .post(slack_panics_webhook)
 356            .header("Content-Type", "application/json")
 357            .body(payload_json)
 358            .send()
 359            .await
 360            .map_err(|err| {
 361                log::error!("Failed to send payload to Slack: {err}");
 362                Error::Internal(anyhow!(err))
 363            })?;
 364    }
 365
 366    Ok(())
 367}
 368
 369fn report_to_slack(panic: &Panic) -> bool {
 370    if panic.payload.contains("ERROR_SURFACE_LOST_KHR") {
 371        return false;
 372    }
 373
 374    if panic.payload.contains("ERROR_INITIALIZATION_FAILED") {
 375        return false;
 376    }
 377
 378    if panic
 379        .payload
 380        .contains("GPU has crashed, and no debug information is available")
 381    {
 382        return false;
 383    }
 384
 385    true
 386}
 387
 388pub async fn post_events(
 389    Extension(app): Extension<Arc<AppState>>,
 390    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 391    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 392    body: Bytes,
 393) -> Result<()> {
 394    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 395        return Err(Error::http(
 396            StatusCode::INTERNAL_SERVER_ERROR,
 397            "events not enabled".into(),
 398        ))?;
 399    };
 400
 401    let checksum_matched = checksum == expected;
 402
 403    let request_body: telemetry_events::EventRequestBody =
 404        serde_json::from_slice(&body).map_err(|err| {
 405            log::error!("can't parse event json: {err}");
 406            Error::Internal(anyhow!(err))
 407        })?;
 408
 409    let mut to_upload = ToUpload::default();
 410    let Some(last_event) = request_body.events.last() else {
 411        return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
 412    };
 413    let country_code = country_code_header.map(|h| h.to_string());
 414
 415    let first_event_at = chrono::Utc::now()
 416        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 417
 418    if let Some(kinesis_client) = app.kinesis_client.clone() {
 419        if let Some(stream) = app.config.kinesis_stream.clone() {
 420            let mut request = kinesis_client.put_records().stream_name(stream);
 421            for row in for_snowflake(request_body.clone(), first_event_at) {
 422                if let Some(data) = serde_json::to_vec(&row).log_err() {
 423                    request = request.records(
 424                        aws_sdk_kinesis::types::PutRecordsRequestEntry::builder()
 425                            .partition_key(request_body.system_id.clone().unwrap_or_default())
 426                            .data(data.into())
 427                            .build()
 428                            .unwrap(),
 429                    );
 430                }
 431            }
 432            request.send().await.log_err();
 433        }
 434    };
 435
 436    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 437        Err(Error::http(
 438            StatusCode::NOT_IMPLEMENTED,
 439            "not supported".into(),
 440        ))?
 441    };
 442
 443    let first_event_at = chrono::Utc::now()
 444        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 445
 446    for wrapper in &request_body.events {
 447        match &wrapper.event {
 448            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 449                event.clone(),
 450                wrapper,
 451                &request_body,
 452                first_event_at,
 453                country_code.clone(),
 454                checksum_matched,
 455            )),
 456            Event::InlineCompletion(event) => {
 457                to_upload
 458                    .inline_completion_events
 459                    .push(InlineCompletionEventRow::from_event(
 460                        event.clone(),
 461                        wrapper,
 462                        &request_body,
 463                        first_event_at,
 464                        country_code.clone(),
 465                        checksum_matched,
 466                    ))
 467            }
 468            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 469                event.clone(),
 470                wrapper,
 471                &request_body,
 472                first_event_at,
 473                checksum_matched,
 474            )),
 475            Event::Assistant(event) => {
 476                to_upload
 477                    .assistant_events
 478                    .push(AssistantEventRow::from_event(
 479                        event.clone(),
 480                        wrapper,
 481                        &request_body,
 482                        first_event_at,
 483                        checksum_matched,
 484                    ))
 485            }
 486            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 487                event.clone(),
 488                wrapper,
 489                &request_body,
 490                first_event_at,
 491                checksum_matched,
 492            )),
 493            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 494                event.clone(),
 495                wrapper,
 496                &request_body,
 497                first_event_at,
 498                checksum_matched,
 499            )),
 500            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 501                event.clone(),
 502                wrapper,
 503                &request_body,
 504                first_event_at,
 505                checksum_matched,
 506            )),
 507            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 508                event.clone(),
 509                wrapper,
 510                &request_body,
 511                first_event_at,
 512                checksum_matched,
 513            )),
 514            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 515                event.clone(),
 516                wrapper,
 517                &request_body,
 518                first_event_at,
 519                checksum_matched,
 520            )),
 521            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 522                event.clone(),
 523                wrapper,
 524                &request_body,
 525                first_event_at,
 526                checksum_matched,
 527            )),
 528            Event::Extension(event) => {
 529                let metadata = app
 530                    .db
 531                    .get_extension_version(&event.extension_id, &event.version)
 532                    .await?;
 533                to_upload
 534                    .extension_events
 535                    .push(ExtensionEventRow::from_event(
 536                        event.clone(),
 537                        wrapper,
 538                        &request_body,
 539                        metadata,
 540                        first_event_at,
 541                        checksum_matched,
 542                    ))
 543            }
 544            Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
 545                event.clone(),
 546                wrapper,
 547                &request_body,
 548                first_event_at,
 549                checksum_matched,
 550            )),
 551        }
 552    }
 553
 554    to_upload
 555        .upload(&clickhouse_client)
 556        .await
 557        .map_err(|err| Error::Internal(anyhow!(err)))?;
 558
 559    Ok(())
 560}
 561
 562#[derive(Default)]
 563struct ToUpload {
 564    editor_events: Vec<EditorEventRow>,
 565    inline_completion_events: Vec<InlineCompletionEventRow>,
 566    assistant_events: Vec<AssistantEventRow>,
 567    call_events: Vec<CallEventRow>,
 568    cpu_events: Vec<CpuEventRow>,
 569    memory_events: Vec<MemoryEventRow>,
 570    app_events: Vec<AppEventRow>,
 571    setting_events: Vec<SettingEventRow>,
 572    extension_events: Vec<ExtensionEventRow>,
 573    edit_events: Vec<EditEventRow>,
 574    action_events: Vec<ActionEventRow>,
 575    repl_events: Vec<ReplEventRow>,
 576}
 577
 578impl ToUpload {
 579    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 580        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 581        write_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 582            .await
 583            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 584
 585        const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
 586        write_to_table(
 587            INLINE_COMPLETION_EVENTS_TABLE,
 588            &self.inline_completion_events,
 589            clickhouse_client,
 590        )
 591        .await
 592        .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
 593
 594        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 595        write_to_table(
 596            ASSISTANT_EVENTS_TABLE,
 597            &self.assistant_events,
 598            clickhouse_client,
 599        )
 600        .await
 601        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 602
 603        const CALL_EVENTS_TABLE: &str = "call_events";
 604        write_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 605            .await
 606            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 607
 608        const CPU_EVENTS_TABLE: &str = "cpu_events";
 609        write_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 610            .await
 611            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 612
 613        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 614        write_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 615            .await
 616            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 617
 618        const APP_EVENTS_TABLE: &str = "app_events";
 619        write_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 620            .await
 621            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 622
 623        const SETTING_EVENTS_TABLE: &str = "setting_events";
 624        write_to_table(
 625            SETTING_EVENTS_TABLE,
 626            &self.setting_events,
 627            clickhouse_client,
 628        )
 629        .await
 630        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 631
 632        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 633        write_to_table(
 634            EXTENSION_EVENTS_TABLE,
 635            &self.extension_events,
 636            clickhouse_client,
 637        )
 638        .await
 639        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 640
 641        const EDIT_EVENTS_TABLE: &str = "edit_events";
 642        write_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 643            .await
 644            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 645
 646        const ACTION_EVENTS_TABLE: &str = "action_events";
 647        write_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 648            .await
 649            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 650
 651        const REPL_EVENTS_TABLE: &str = "repl_events";
 652        write_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
 653            .await
 654            .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
 655
 656        Ok(())
 657    }
 658}
 659
 660pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 661where
 662    S: Serializer,
 663{
 664    if country_code.len() != 2 {
 665        use serde::ser::Error;
 666        return Err(S::Error::custom(
 667            "country_code must be exactly 2 characters",
 668        ));
 669    }
 670
 671    let country_code = country_code.as_bytes();
 672
 673    serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
 674}
 675
 676#[derive(Serialize, Debug, clickhouse::Row)]
 677pub struct EditorEventRow {
 678    system_id: String,
 679    installation_id: String,
 680    session_id: Option<String>,
 681    metrics_id: String,
 682    operation: String,
 683    app_version: String,
 684    file_extension: String,
 685    os_name: String,
 686    os_version: String,
 687    release_channel: String,
 688    signed_in: bool,
 689    vim_mode: bool,
 690    #[serde(serialize_with = "serialize_country_code")]
 691    country_code: String,
 692    region_code: String,
 693    city: String,
 694    time: i64,
 695    copilot_enabled: bool,
 696    copilot_enabled_for_language: bool,
 697    architecture: String,
 698    is_staff: Option<bool>,
 699    major: Option<i32>,
 700    minor: Option<i32>,
 701    patch: Option<i32>,
 702    checksum_matched: bool,
 703    is_via_ssh: bool,
 704}
 705
 706impl EditorEventRow {
 707    fn from_event(
 708        event: EditorEvent,
 709        wrapper: &EventWrapper,
 710        body: &EventRequestBody,
 711        first_event_at: chrono::DateTime<chrono::Utc>,
 712        country_code: Option<String>,
 713        checksum_matched: bool,
 714    ) -> Self {
 715        let semver = body.semver();
 716        let time =
 717            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 718
 719        Self {
 720            app_version: body.app_version.clone(),
 721            major: semver.map(|v| v.major() as i32),
 722            minor: semver.map(|v| v.minor() as i32),
 723            patch: semver.map(|v| v.patch() as i32),
 724            checksum_matched,
 725            release_channel: body.release_channel.clone().unwrap_or_default(),
 726            os_name: body.os_name.clone(),
 727            os_version: body.os_version.clone().unwrap_or_default(),
 728            architecture: body.architecture.clone(),
 729            system_id: body.system_id.clone().unwrap_or_default(),
 730            installation_id: body.installation_id.clone().unwrap_or_default(),
 731            session_id: body.session_id.clone(),
 732            metrics_id: body.metrics_id.clone().unwrap_or_default(),
 733            is_staff: body.is_staff,
 734            time: time.timestamp_millis(),
 735            operation: event.operation,
 736            file_extension: event.file_extension.unwrap_or_default(),
 737            signed_in: wrapper.signed_in,
 738            vim_mode: event.vim_mode,
 739            copilot_enabled: event.copilot_enabled,
 740            copilot_enabled_for_language: event.copilot_enabled_for_language,
 741            country_code: country_code.unwrap_or("XX".to_string()),
 742            region_code: "".to_string(),
 743            city: "".to_string(),
 744            is_via_ssh: event.is_via_ssh,
 745        }
 746    }
 747}
 748
 749#[derive(Serialize, Debug, clickhouse::Row)]
 750pub struct InlineCompletionEventRow {
 751    installation_id: String,
 752    session_id: Option<String>,
 753    provider: String,
 754    suggestion_accepted: bool,
 755    app_version: String,
 756    file_extension: String,
 757    os_name: String,
 758    os_version: String,
 759    release_channel: String,
 760    signed_in: bool,
 761    #[serde(serialize_with = "serialize_country_code")]
 762    country_code: String,
 763    region_code: String,
 764    city: String,
 765    time: i64,
 766    is_staff: Option<bool>,
 767    major: Option<i32>,
 768    minor: Option<i32>,
 769    patch: Option<i32>,
 770    checksum_matched: bool,
 771}
 772
 773impl InlineCompletionEventRow {
 774    fn from_event(
 775        event: InlineCompletionEvent,
 776        wrapper: &EventWrapper,
 777        body: &EventRequestBody,
 778        first_event_at: chrono::DateTime<chrono::Utc>,
 779        country_code: Option<String>,
 780        checksum_matched: bool,
 781    ) -> Self {
 782        let semver = body.semver();
 783        let time =
 784            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 785
 786        Self {
 787            app_version: body.app_version.clone(),
 788            major: semver.map(|v| v.major() as i32),
 789            minor: semver.map(|v| v.minor() as i32),
 790            patch: semver.map(|v| v.patch() as i32),
 791            checksum_matched,
 792            release_channel: body.release_channel.clone().unwrap_or_default(),
 793            os_name: body.os_name.clone(),
 794            os_version: body.os_version.clone().unwrap_or_default(),
 795            installation_id: body.installation_id.clone().unwrap_or_default(),
 796            session_id: body.session_id.clone(),
 797            is_staff: body.is_staff,
 798            time: time.timestamp_millis(),
 799            file_extension: event.file_extension.unwrap_or_default(),
 800            signed_in: wrapper.signed_in,
 801            country_code: country_code.unwrap_or("XX".to_string()),
 802            region_code: "".to_string(),
 803            city: "".to_string(),
 804            provider: event.provider,
 805            suggestion_accepted: event.suggestion_accepted,
 806        }
 807    }
 808}
 809
 810#[derive(Serialize, Debug, clickhouse::Row)]
 811pub struct CallEventRow {
 812    // AppInfoBase
 813    app_version: String,
 814    major: Option<i32>,
 815    minor: Option<i32>,
 816    patch: Option<i32>,
 817    release_channel: String,
 818    os_name: String,
 819    os_version: String,
 820    checksum_matched: bool,
 821
 822    // ClientEventBase
 823    installation_id: String,
 824    session_id: Option<String>,
 825    is_staff: Option<bool>,
 826    time: i64,
 827
 828    // CallEventRow
 829    operation: String,
 830    room_id: Option<u64>,
 831    channel_id: Option<u64>,
 832}
 833
 834impl CallEventRow {
 835    fn from_event(
 836        event: CallEvent,
 837        wrapper: &EventWrapper,
 838        body: &EventRequestBody,
 839        first_event_at: chrono::DateTime<chrono::Utc>,
 840        checksum_matched: bool,
 841    ) -> Self {
 842        let semver = body.semver();
 843        let time =
 844            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 845
 846        Self {
 847            app_version: body.app_version.clone(),
 848            major: semver.map(|v| v.major() as i32),
 849            minor: semver.map(|v| v.minor() as i32),
 850            patch: semver.map(|v| v.patch() as i32),
 851            checksum_matched,
 852            release_channel: body.release_channel.clone().unwrap_or_default(),
 853            os_name: body.os_name.clone(),
 854            os_version: body.os_version.clone().unwrap_or_default(),
 855            installation_id: body.installation_id.clone().unwrap_or_default(),
 856            session_id: body.session_id.clone(),
 857            is_staff: body.is_staff,
 858            time: time.timestamp_millis(),
 859            operation: event.operation,
 860            room_id: event.room_id,
 861            channel_id: event.channel_id,
 862        }
 863    }
 864}
 865
 866#[derive(Serialize, Debug, clickhouse::Row)]
 867pub struct AssistantEventRow {
 868    // AppInfoBase
 869    app_version: String,
 870    major: Option<i32>,
 871    minor: Option<i32>,
 872    patch: Option<i32>,
 873    checksum_matched: bool,
 874    release_channel: String,
 875    os_name: String,
 876    os_version: String,
 877
 878    // ClientEventBase
 879    installation_id: Option<String>,
 880    session_id: Option<String>,
 881    is_staff: Option<bool>,
 882    time: i64,
 883
 884    // AssistantEventRow
 885    conversation_id: String,
 886    kind: String,
 887    phase: String,
 888    model: String,
 889    response_latency_in_ms: Option<i64>,
 890    error_message: Option<String>,
 891}
 892
 893impl AssistantEventRow {
 894    fn from_event(
 895        event: AssistantEvent,
 896        wrapper: &EventWrapper,
 897        body: &EventRequestBody,
 898        first_event_at: chrono::DateTime<chrono::Utc>,
 899        checksum_matched: bool,
 900    ) -> Self {
 901        let semver = body.semver();
 902        let time =
 903            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 904
 905        Self {
 906            app_version: body.app_version.clone(),
 907            major: semver.map(|v| v.major() as i32),
 908            minor: semver.map(|v| v.minor() as i32),
 909            patch: semver.map(|v| v.patch() as i32),
 910            checksum_matched,
 911            release_channel: body.release_channel.clone().unwrap_or_default(),
 912            os_name: body.os_name.clone(),
 913            os_version: body.os_version.clone().unwrap_or_default(),
 914            installation_id: body.installation_id.clone(),
 915            session_id: body.session_id.clone(),
 916            is_staff: body.is_staff,
 917            time: time.timestamp_millis(),
 918            conversation_id: event.conversation_id.unwrap_or_default(),
 919            kind: event.kind.to_string(),
 920            phase: event.phase.to_string(),
 921            model: event.model,
 922            response_latency_in_ms: event
 923                .response_latency
 924                .map(|latency| latency.as_millis() as i64),
 925            error_message: event.error_message,
 926        }
 927    }
 928}
 929
 930#[derive(Debug, clickhouse::Row, Serialize)]
 931pub struct CpuEventRow {
 932    installation_id: Option<String>,
 933    session_id: Option<String>,
 934    is_staff: Option<bool>,
 935    usage_as_percentage: f32,
 936    core_count: u32,
 937    app_version: String,
 938    release_channel: String,
 939    os_name: String,
 940    os_version: String,
 941    time: i64,
 942    // pub normalized_cpu_usage: f64, MATERIALIZED
 943    major: Option<i32>,
 944    minor: Option<i32>,
 945    patch: Option<i32>,
 946    checksum_matched: bool,
 947}
 948
 949impl CpuEventRow {
 950    fn from_event(
 951        event: CpuEvent,
 952        wrapper: &EventWrapper,
 953        body: &EventRequestBody,
 954        first_event_at: chrono::DateTime<chrono::Utc>,
 955        checksum_matched: bool,
 956    ) -> Self {
 957        let semver = body.semver();
 958        let time =
 959            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 960
 961        Self {
 962            app_version: body.app_version.clone(),
 963            major: semver.map(|v| v.major() as i32),
 964            minor: semver.map(|v| v.minor() as i32),
 965            patch: semver.map(|v| v.patch() as i32),
 966            checksum_matched,
 967            release_channel: body.release_channel.clone().unwrap_or_default(),
 968            os_name: body.os_name.clone(),
 969            os_version: body.os_version.clone().unwrap_or_default(),
 970            installation_id: body.installation_id.clone(),
 971            session_id: body.session_id.clone(),
 972            is_staff: body.is_staff,
 973            time: time.timestamp_millis(),
 974            usage_as_percentage: event.usage_as_percentage,
 975            core_count: event.core_count,
 976        }
 977    }
 978}
 979
 980#[derive(Serialize, Debug, clickhouse::Row)]
 981pub struct MemoryEventRow {
 982    // AppInfoBase
 983    app_version: String,
 984    major: Option<i32>,
 985    minor: Option<i32>,
 986    patch: Option<i32>,
 987    checksum_matched: bool,
 988    release_channel: String,
 989    os_name: String,
 990    os_version: String,
 991
 992    // ClientEventBase
 993    installation_id: Option<String>,
 994    session_id: Option<String>,
 995    is_staff: Option<bool>,
 996    time: i64,
 997
 998    // MemoryEventRow
 999    memory_in_bytes: u64,
1000    virtual_memory_in_bytes: u64,
1001}
1002
1003impl MemoryEventRow {
1004    fn from_event(
1005        event: MemoryEvent,
1006        wrapper: &EventWrapper,
1007        body: &EventRequestBody,
1008        first_event_at: chrono::DateTime<chrono::Utc>,
1009        checksum_matched: bool,
1010    ) -> Self {
1011        let semver = body.semver();
1012        let time =
1013            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1014
1015        Self {
1016            app_version: body.app_version.clone(),
1017            major: semver.map(|v| v.major() as i32),
1018            minor: semver.map(|v| v.minor() as i32),
1019            patch: semver.map(|v| v.patch() as i32),
1020            checksum_matched,
1021            release_channel: body.release_channel.clone().unwrap_or_default(),
1022            os_name: body.os_name.clone(),
1023            os_version: body.os_version.clone().unwrap_or_default(),
1024            installation_id: body.installation_id.clone(),
1025            session_id: body.session_id.clone(),
1026            is_staff: body.is_staff,
1027            time: time.timestamp_millis(),
1028            memory_in_bytes: event.memory_in_bytes,
1029            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
1030        }
1031    }
1032}
1033
1034#[derive(Serialize, Debug, clickhouse::Row)]
1035pub struct AppEventRow {
1036    // AppInfoBase
1037    app_version: String,
1038    major: Option<i32>,
1039    minor: Option<i32>,
1040    patch: Option<i32>,
1041    checksum_matched: bool,
1042    release_channel: String,
1043    os_name: String,
1044    os_version: String,
1045
1046    // ClientEventBase
1047    installation_id: Option<String>,
1048    session_id: Option<String>,
1049    is_staff: Option<bool>,
1050    time: i64,
1051
1052    // AppEventRow
1053    operation: String,
1054}
1055
1056impl AppEventRow {
1057    fn from_event(
1058        event: AppEvent,
1059        wrapper: &EventWrapper,
1060        body: &EventRequestBody,
1061        first_event_at: chrono::DateTime<chrono::Utc>,
1062        checksum_matched: bool,
1063    ) -> Self {
1064        let semver = body.semver();
1065        let time =
1066            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1067
1068        Self {
1069            app_version: body.app_version.clone(),
1070            major: semver.map(|v| v.major() as i32),
1071            minor: semver.map(|v| v.minor() as i32),
1072            patch: semver.map(|v| v.patch() as i32),
1073            checksum_matched,
1074            release_channel: body.release_channel.clone().unwrap_or_default(),
1075            os_name: body.os_name.clone(),
1076            os_version: body.os_version.clone().unwrap_or_default(),
1077            installation_id: body.installation_id.clone(),
1078            session_id: body.session_id.clone(),
1079            is_staff: body.is_staff,
1080            time: time.timestamp_millis(),
1081            operation: event.operation,
1082        }
1083    }
1084}
1085
1086#[derive(Serialize, Debug, clickhouse::Row)]
1087pub struct SettingEventRow {
1088    // AppInfoBase
1089    app_version: String,
1090    major: Option<i32>,
1091    minor: Option<i32>,
1092    patch: Option<i32>,
1093    checksum_matched: bool,
1094    release_channel: String,
1095    os_name: String,
1096    os_version: String,
1097
1098    // ClientEventBase
1099    installation_id: Option<String>,
1100    session_id: Option<String>,
1101    is_staff: Option<bool>,
1102    time: i64,
1103    // SettingEventRow
1104    setting: String,
1105    value: String,
1106}
1107
1108impl SettingEventRow {
1109    fn from_event(
1110        event: SettingEvent,
1111        wrapper: &EventWrapper,
1112        body: &EventRequestBody,
1113        first_event_at: chrono::DateTime<chrono::Utc>,
1114        checksum_matched: bool,
1115    ) -> Self {
1116        let semver = body.semver();
1117        let time =
1118            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1119
1120        Self {
1121            app_version: body.app_version.clone(),
1122            major: semver.map(|v| v.major() as i32),
1123            minor: semver.map(|v| v.minor() as i32),
1124            checksum_matched,
1125            patch: semver.map(|v| v.patch() as i32),
1126            release_channel: body.release_channel.clone().unwrap_or_default(),
1127            os_name: body.os_name.clone(),
1128            os_version: body.os_version.clone().unwrap_or_default(),
1129            installation_id: body.installation_id.clone(),
1130            session_id: body.session_id.clone(),
1131            is_staff: body.is_staff,
1132            time: time.timestamp_millis(),
1133            setting: event.setting,
1134            value: event.value,
1135        }
1136    }
1137}
1138
1139#[derive(Serialize, Debug, clickhouse::Row)]
1140pub struct ExtensionEventRow {
1141    // AppInfoBase
1142    app_version: String,
1143    major: Option<i32>,
1144    minor: Option<i32>,
1145    patch: Option<i32>,
1146    checksum_matched: bool,
1147    release_channel: String,
1148    os_name: String,
1149    os_version: String,
1150
1151    // ClientEventBase
1152    installation_id: Option<String>,
1153    session_id: Option<String>,
1154    is_staff: Option<bool>,
1155    time: i64,
1156
1157    // ExtensionEventRow
1158    extension_id: Arc<str>,
1159    extension_version: Arc<str>,
1160    dev: bool,
1161    schema_version: Option<i32>,
1162    wasm_api_version: Option<String>,
1163}
1164
1165impl ExtensionEventRow {
1166    fn from_event(
1167        event: ExtensionEvent,
1168        wrapper: &EventWrapper,
1169        body: &EventRequestBody,
1170        extension_metadata: Option<ExtensionMetadata>,
1171        first_event_at: chrono::DateTime<chrono::Utc>,
1172        checksum_matched: bool,
1173    ) -> Self {
1174        let semver = body.semver();
1175        let time =
1176            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1177
1178        Self {
1179            app_version: body.app_version.clone(),
1180            major: semver.map(|v| v.major() as i32),
1181            minor: semver.map(|v| v.minor() as i32),
1182            patch: semver.map(|v| v.patch() as i32),
1183            checksum_matched,
1184            release_channel: body.release_channel.clone().unwrap_or_default(),
1185            os_name: body.os_name.clone(),
1186            os_version: body.os_version.clone().unwrap_or_default(),
1187            installation_id: body.installation_id.clone(),
1188            session_id: body.session_id.clone(),
1189            is_staff: body.is_staff,
1190            time: time.timestamp_millis(),
1191            extension_id: event.extension_id,
1192            extension_version: event.version,
1193            dev: extension_metadata.is_none(),
1194            schema_version: extension_metadata
1195                .as_ref()
1196                .and_then(|metadata| metadata.manifest.schema_version),
1197            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1198                metadata
1199                    .manifest
1200                    .wasm_api_version
1201                    .as_ref()
1202                    .map(|version| version.to_string())
1203            }),
1204        }
1205    }
1206}
1207
1208#[derive(Serialize, Debug, clickhouse::Row)]
1209pub struct ReplEventRow {
1210    // AppInfoBase
1211    app_version: String,
1212    major: Option<i32>,
1213    minor: Option<i32>,
1214    patch: Option<i32>,
1215    checksum_matched: bool,
1216    release_channel: String,
1217    os_name: String,
1218    os_version: String,
1219
1220    // ClientEventBase
1221    installation_id: Option<String>,
1222    session_id: Option<String>,
1223    is_staff: Option<bool>,
1224    time: i64,
1225
1226    // ReplEventRow
1227    kernel_language: String,
1228    kernel_status: String,
1229    repl_session_id: String,
1230}
1231
1232impl ReplEventRow {
1233    fn from_event(
1234        event: ReplEvent,
1235        wrapper: &EventWrapper,
1236        body: &EventRequestBody,
1237        first_event_at: chrono::DateTime<chrono::Utc>,
1238        checksum_matched: bool,
1239    ) -> Self {
1240        let semver = body.semver();
1241        let time =
1242            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1243
1244        Self {
1245            app_version: body.app_version.clone(),
1246            major: semver.map(|v| v.major() as i32),
1247            minor: semver.map(|v| v.minor() as i32),
1248            patch: semver.map(|v| v.patch() as i32),
1249            checksum_matched,
1250            release_channel: body.release_channel.clone().unwrap_or_default(),
1251            os_name: body.os_name.clone(),
1252            os_version: body.os_version.clone().unwrap_or_default(),
1253            installation_id: body.installation_id.clone(),
1254            session_id: body.session_id.clone(),
1255            is_staff: body.is_staff,
1256            time: time.timestamp_millis(),
1257            kernel_language: event.kernel_language,
1258            kernel_status: event.kernel_status,
1259            repl_session_id: event.repl_session_id,
1260        }
1261    }
1262}
1263
1264#[derive(Serialize, Debug, clickhouse::Row)]
1265pub struct EditEventRow {
1266    // AppInfoBase
1267    app_version: String,
1268    major: Option<i32>,
1269    minor: Option<i32>,
1270    patch: Option<i32>,
1271    checksum_matched: bool,
1272    release_channel: String,
1273    os_name: String,
1274    os_version: String,
1275
1276    // ClientEventBase
1277    installation_id: Option<String>,
1278    // Note: This column name has a typo in the ClickHouse table.
1279    #[serde(rename = "sesssion_id")]
1280    session_id: Option<String>,
1281    is_staff: Option<bool>,
1282    time: i64,
1283
1284    // EditEventRow
1285    period_start: i64,
1286    period_end: i64,
1287    environment: String,
1288    is_via_ssh: bool,
1289}
1290
1291impl EditEventRow {
1292    fn from_event(
1293        event: EditEvent,
1294        wrapper: &EventWrapper,
1295        body: &EventRequestBody,
1296        first_event_at: chrono::DateTime<chrono::Utc>,
1297        checksum_matched: bool,
1298    ) -> Self {
1299        let semver = body.semver();
1300        let time =
1301            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1302
1303        let period_start = time - chrono::Duration::milliseconds(event.duration);
1304        let period_end = time;
1305
1306        Self {
1307            app_version: body.app_version.clone(),
1308            major: semver.map(|v| v.major() as i32),
1309            minor: semver.map(|v| v.minor() as i32),
1310            patch: semver.map(|v| v.patch() as i32),
1311            checksum_matched,
1312            release_channel: body.release_channel.clone().unwrap_or_default(),
1313            os_name: body.os_name.clone(),
1314            os_version: body.os_version.clone().unwrap_or_default(),
1315            installation_id: body.installation_id.clone(),
1316            session_id: body.session_id.clone(),
1317            is_staff: body.is_staff,
1318            time: time.timestamp_millis(),
1319            period_start: period_start.timestamp_millis(),
1320            period_end: period_end.timestamp_millis(),
1321            environment: event.environment,
1322            is_via_ssh: event.is_via_ssh,
1323        }
1324    }
1325}
1326
1327#[derive(Serialize, Debug, clickhouse::Row)]
1328pub struct ActionEventRow {
1329    // AppInfoBase
1330    app_version: String,
1331    major: Option<i32>,
1332    minor: Option<i32>,
1333    patch: Option<i32>,
1334    checksum_matched: bool,
1335    release_channel: String,
1336    os_name: String,
1337    os_version: String,
1338
1339    // ClientEventBase
1340    installation_id: Option<String>,
1341    // Note: This column name has a typo in the ClickHouse table.
1342    #[serde(rename = "sesssion_id")]
1343    session_id: Option<String>,
1344    is_staff: Option<bool>,
1345    time: i64,
1346    // ActionEventRow
1347    source: String,
1348    action: String,
1349}
1350
1351impl ActionEventRow {
1352    fn from_event(
1353        event: ActionEvent,
1354        wrapper: &EventWrapper,
1355        body: &EventRequestBody,
1356        first_event_at: chrono::DateTime<chrono::Utc>,
1357        checksum_matched: bool,
1358    ) -> Self {
1359        let semver = body.semver();
1360        let time =
1361            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1362
1363        Self {
1364            app_version: body.app_version.clone(),
1365            major: semver.map(|v| v.major() as i32),
1366            minor: semver.map(|v| v.minor() as i32),
1367            patch: semver.map(|v| v.patch() as i32),
1368            checksum_matched,
1369            release_channel: body.release_channel.clone().unwrap_or_default(),
1370            os_name: body.os_name.clone(),
1371            os_version: body.os_version.clone().unwrap_or_default(),
1372            installation_id: body.installation_id.clone(),
1373            session_id: body.session_id.clone(),
1374            is_staff: body.is_staff,
1375            time: time.timestamp_millis(),
1376            source: event.source,
1377            action: event.action,
1378        }
1379    }
1380}
1381
1382pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1383    let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
1384
1385    let mut summer = Sha256::new();
1386    summer.update(checksum_seed);
1387    summer.update(json);
1388    summer.update(checksum_seed);
1389    Some(summer.finalize().into_iter().collect())
1390}
1391
1392fn for_snowflake(
1393    body: EventRequestBody,
1394    first_event_at: chrono::DateTime<chrono::Utc>,
1395) -> impl Iterator<Item = SnowflakeRow> {
1396    body.events.into_iter().map(move |event| {
1397        let timestamp =
1398            first_event_at + Duration::milliseconds(event.milliseconds_since_first_event);
1399        let (event_type, mut event_properties) = match &event.event {
1400            Event::Editor(e) => (
1401                match e.operation.as_str() {
1402                    "open" => "Editor Opened".to_string(),
1403                    "save" => "Editor Saved".to_string(),
1404                    _ => format!("Unknown Editor Event: {}", e.operation),
1405                },
1406                serde_json::to_value(e).unwrap(),
1407            ),
1408            Event::InlineCompletion(e) => (
1409                format!(
1410                    "Inline Completion {}",
1411                    if e.suggestion_accepted {
1412                        "Accepted"
1413                    } else {
1414                        "Discarded"
1415                    }
1416                ),
1417                serde_json::to_value(e).unwrap(),
1418            ),
1419            Event::Call(e) => {
1420                let event_type = match e.operation.trim() {
1421                    "unshare project" => "Project Unshared".to_string(),
1422                    "open channel notes" => "Channel Notes Opened".to_string(),
1423                    "share project" => "Project Shared".to_string(),
1424                    "join channel" => "Channel Joined".to_string(),
1425                    "hang up" => "Call Ended".to_string(),
1426                    "accept incoming" => "Incoming Call Accepted".to_string(),
1427                    "invite" => "Participant Invited".to_string(),
1428                    "disable microphone" => "Microphone Disabled".to_string(),
1429                    "enable microphone" => "Microphone Enabled".to_string(),
1430                    "enable screen share" => "Screen Share Enabled".to_string(),
1431                    "disable screen share" => "Screen Share Disabled".to_string(),
1432                    "decline incoming" => "Incoming Call Declined".to_string(),
1433                    "enable camera" => "Camera Enabled".to_string(),
1434                    "disable camera" => "Camera Disabled".to_string(),
1435                    _ => format!("Unknown Call Event: {}", e.operation),
1436                };
1437
1438                (event_type, serde_json::to_value(e).unwrap())
1439            }
1440            Event::Assistant(e) => (
1441                match e.phase {
1442                    telemetry_events::AssistantPhase::Response => "Assistant Responded".to_string(),
1443                    telemetry_events::AssistantPhase::Invoked => "Assistant Invoked".to_string(),
1444                    telemetry_events::AssistantPhase::Accepted => {
1445                        "Assistant Response Accepted".to_string()
1446                    }
1447                    telemetry_events::AssistantPhase::Rejected => {
1448                        "Assistant Response Rejected".to_string()
1449                    }
1450                },
1451                serde_json::to_value(e).unwrap(),
1452            ),
1453            Event::Cpu(e) => (
1454                "System CPU Sampled".to_string(),
1455                serde_json::to_value(e).unwrap(),
1456            ),
1457            Event::Memory(e) => (
1458                "System Memory Sampled".to_string(),
1459                serde_json::to_value(e).unwrap(),
1460            ),
1461            Event::App(e) => {
1462                let mut properties = json!({});
1463                let event_type = match e.operation.trim() {
1464                    "extensions: install extension" => "Extension Installed".to_string(),
1465                    "open" => "App Opened".to_string(),
1466                    "project search: open" => "Project Search Opened".to_string(),
1467                    "first open" => {
1468                        properties["is_first_open"] = json!(true);
1469                        "App First Opened".to_string()
1470                    }
1471                    "extensions: uninstall extension" => "Extension Uninstalled".to_string(),
1472                    "welcome page: close" => "Welcome Page Closed".to_string(),
1473                    "open project" => {
1474                        properties["is_first_time"] = json!(false);
1475                        "Project Opened".to_string()
1476                    }
1477                    "welcome page: install cli" => "CLI Installed".to_string(),
1478                    "project diagnostics: open" => "Project Diagnostics Opened".to_string(),
1479                    "extensions page: open" => "Extensions Page Opened".to_string(),
1480                    "welcome page: change theme" => "Welcome Theme Changed".to_string(),
1481                    "welcome page: toggle metric telemetry" => {
1482                        properties["enabled"] = json!(false);
1483                        "Welcome Telemetry Toggled".to_string()
1484                    }
1485                    "welcome page: change keymap" => "Keymap Changed".to_string(),
1486                    "welcome page: toggle vim" => {
1487                        properties["enabled"] = json!(false);
1488                        "Welcome Vim Mode Toggled".to_string()
1489                    }
1490                    "welcome page: sign in to copilot" => "Welcome Copilot Signed In".to_string(),
1491                    "welcome page: toggle diagnostic telemetry" => {
1492                        "Welcome Telemetry Toggled".to_string()
1493                    }
1494                    "welcome page: open" => "Welcome Page Opened".to_string(),
1495                    "close" => "App Closed".to_string(),
1496                    "markdown preview: open" => "Markdown Preview Opened".to_string(),
1497                    "welcome page: open extensions" => "Extensions Page Opened".to_string(),
1498                    "open node project" | "open pnpm project" | "open yarn project" => {
1499                        properties["project_type"] = json!("node");
1500                        properties["is_first_time"] = json!(false);
1501                        "Project Opened".to_string()
1502                    }
1503                    "repl sessions: open" => "REPL Session Started".to_string(),
1504                    "welcome page: toggle helix" => {
1505                        properties["enabled"] = json!(false);
1506                        "Helix Mode Toggled".to_string()
1507                    }
1508                    "welcome page: edit settings" => {
1509                        properties["changed_settings"] = json!([]);
1510                        "Settings Edited".to_string()
1511                    }
1512                    "welcome page: view docs" => "Documentation Viewed".to_string(),
1513                    "open ssh project" => {
1514                        properties["is_first_time"] = json!(false);
1515                        "SSH Project Opened".to_string()
1516                    }
1517                    "create ssh server" => "SSH Server Created".to_string(),
1518                    "create ssh project" => "SSH Project Created".to_string(),
1519                    "first open for release channel" => {
1520                        properties["is_first_for_channel"] = json!(true);
1521                        "App First Opened For Release Channel".to_string()
1522                    }
1523                    "feature upsell: toggle vim" => {
1524                        properties["source"] = json!("Feature Upsell");
1525                        "Vim Mode Toggled".to_string()
1526                    }
1527                    _ => e
1528                        .operation
1529                        .strip_prefix("feature upsell: viewed docs (")
1530                        .and_then(|s| s.strip_suffix(')'))
1531                        .map_or_else(
1532                            || format!("Unknown App Event: {}", e.operation),
1533                            |docs_url| {
1534                                properties["url"] = json!(docs_url);
1535                                properties["source"] = json!("Feature Upsell");
1536                                "Documentation Viewed".to_string()
1537                            },
1538                        ),
1539                };
1540                (event_type, properties)
1541            }
1542            Event::Setting(e) => (
1543                "Settings Changed".to_string(),
1544                serde_json::to_value(e).unwrap(),
1545            ),
1546            Event::Extension(e) => (
1547                "Extension Loaded".to_string(),
1548                serde_json::to_value(e).unwrap(),
1549            ),
1550            Event::Edit(e) => (
1551                "Editor Edited".to_string(),
1552                serde_json::to_value(e).unwrap(),
1553            ),
1554            Event::Action(e) => (
1555                "Action Invoked".to_string(),
1556                serde_json::to_value(e).unwrap(),
1557            ),
1558            Event::Repl(e) => (
1559                "Kernel Status Changed".to_string(),
1560                serde_json::to_value(e).unwrap(),
1561            ),
1562        };
1563
1564        if let serde_json::Value::Object(ref mut map) = event_properties {
1565            map.insert("app_version".to_string(), body.app_version.clone().into());
1566            map.insert("os_name".to_string(), body.os_name.clone().into());
1567            map.insert("os_version".to_string(), body.os_version.clone().into());
1568            map.insert("architecture".to_string(), body.architecture.clone().into());
1569            map.insert(
1570                "release_channel".to_string(),
1571                body.release_channel.clone().into(),
1572            );
1573            map.insert("signed_in".to_string(), event.signed_in.into());
1574        }
1575
1576        let user_properties = Some(serde_json::json!({
1577            "is_staff": body.is_staff,
1578        }));
1579
1580        SnowflakeRow {
1581            time: timestamp,
1582            user_id: body.metrics_id.clone(),
1583            device_id: body.system_id.clone(),
1584            event_type,
1585            event_properties,
1586            user_properties,
1587            insert_id: Some(Uuid::new_v4().to_string()),
1588        }
1589    })
1590}
1591
1592#[derive(Serialize, Deserialize)]
1593struct SnowflakeRow {
1594    pub time: chrono::DateTime<chrono::Utc>,
1595    pub user_id: Option<String>,
1596    pub device_id: Option<String>,
1597    pub event_type: String,
1598    pub event_properties: serde_json::Value,
1599    pub user_properties: Option<serde_json::Value>,
1600    pub insert_id: Option<String>,
1601}
1602
1603#[derive(Serialize, Deserialize)]
1604struct SnowflakeData {
1605    /// Identifier unique to each Zed installation (differs for stable, preview, dev)
1606    pub installation_id: Option<String>,
1607    /// Identifier unique to each logged in Zed user (randomly generated on first sign in)
1608    /// Identifier unique to each Zed session (differs for each time you open Zed)
1609    pub session_id: Option<String>,
1610    pub metrics_id: Option<String>,
1611    /// True for Zed staff, otherwise false
1612    pub is_staff: Option<bool>,
1613    /// Zed version number
1614    pub app_version: String,
1615    pub os_name: String,
1616    pub os_version: Option<String>,
1617    pub architecture: String,
1618    /// Zed release channel (stable, preview, dev)
1619    pub release_channel: Option<String>,
1620    pub signed_in: bool,
1621
1622    #[serde(flatten)]
1623    pub editor_event: Option<EditorEvent>,
1624    #[serde(flatten)]
1625    pub inline_completion_event: Option<InlineCompletionEvent>,
1626    #[serde(flatten)]
1627    pub call_event: Option<CallEvent>,
1628    #[serde(flatten)]
1629    pub assistant_event: Option<AssistantEvent>,
1630    #[serde(flatten)]
1631    pub cpu_event: Option<CpuEvent>,
1632    #[serde(flatten)]
1633    pub memory_event: Option<MemoryEvent>,
1634    #[serde(flatten)]
1635    pub app_event: Option<AppEvent>,
1636    #[serde(flatten)]
1637    pub setting_event: Option<SettingEvent>,
1638    #[serde(flatten)]
1639    pub extension_event: Option<ExtensionEvent>,
1640    #[serde(flatten)]
1641    pub edit_event: Option<EditEvent>,
1642    #[serde(flatten)]
1643    pub repl_event: Option<ReplEvent>,
1644    #[serde(flatten)]
1645    pub action_event: Option<ActionEvent>,
1646}