events.rs

   1use super::ips_file::IpsFile;
   2use crate::api::CloudflareIpCountryHeader;
   3use crate::clickhouse::write_to_table;
   4use crate::{api::slack, AppState, Error, Result};
   5use anyhow::{anyhow, Context};
   6use aws_sdk_s3::primitives::ByteStream;
   7use axum::{
   8    body::Bytes,
   9    headers::Header,
  10    http::{HeaderMap, HeaderName, StatusCode},
  11    routing::post,
  12    Extension, Router, TypedHeader,
  13};
  14use chrono::Duration;
  15use rpc::ExtensionMetadata;
  16use semantic_version::SemanticVersion;
  17use serde::{Deserialize, Serialize, Serializer};
  18use sha2::{Digest, Sha256};
  19use std::sync::{Arc, OnceLock};
  20use telemetry_events::{
  21    ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
  22    EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, Panic,
  23    ReplEvent, SettingEvent,
  24};
  25use util::ResultExt;
  26use uuid::Uuid;
  27
  28const CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  29
  30pub fn router() -> Router {
  31    Router::new()
  32        .route("/telemetry/events", post(post_events))
  33        .route("/telemetry/crashes", post(post_crash))
  34        .route("/telemetry/panics", post(post_panic))
  35        .route("/telemetry/hangs", post(post_hang))
  36}
  37
  38pub struct ZedChecksumHeader(Vec<u8>);
  39
  40impl Header for ZedChecksumHeader {
  41    fn name() -> &'static HeaderName {
  42        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  43        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  44    }
  45
  46    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  47    where
  48        Self: Sized,
  49        I: Iterator<Item = &'i axum::http::HeaderValue>,
  50    {
  51        let checksum = values
  52            .next()
  53            .ok_or_else(axum::headers::Error::invalid)?
  54            .to_str()
  55            .map_err(|_| axum::headers::Error::invalid())?;
  56
  57        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  58        Ok(Self(bytes))
  59    }
  60
  61    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  62        unimplemented!()
  63    }
  64}
  65
  66pub async fn post_crash(
  67    Extension(app): Extension<Arc<AppState>>,
  68    headers: HeaderMap,
  69    body: Bytes,
  70) -> Result<()> {
  71    let report = IpsFile::parse(&body)?;
  72    let version_threshold = SemanticVersion::new(0, 123, 0);
  73
  74    let bundle_id = &report.header.bundle_id;
  75    let app_version = &report.app_version();
  76
  77    if bundle_id == "dev.zed.Zed-Dev" {
  78        log::error!("Crash uploads from {} are ignored.", bundle_id);
  79        return Ok(());
  80    }
  81
  82    if app_version.is_none() || app_version.unwrap() < version_threshold {
  83        log::error!(
  84            "Crash uploads from {} are ignored.",
  85            report.header.app_version
  86        );
  87        return Ok(());
  88    }
  89    let app_version = app_version.unwrap();
  90
  91    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
  92        let response = blob_store_client
  93            .head_object()
  94            .bucket(CRASH_REPORTS_BUCKET)
  95            .key(report.header.incident_id.clone() + ".ips")
  96            .send()
  97            .await;
  98
  99        if response.is_ok() {
 100            log::info!("We've already uploaded this crash");
 101            return Ok(());
 102        }
 103
 104        blob_store_client
 105            .put_object()
 106            .bucket(CRASH_REPORTS_BUCKET)
 107            .key(report.header.incident_id.clone() + ".ips")
 108            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 109            .body(ByteStream::from(body.to_vec()))
 110            .send()
 111            .await
 112            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 113            .ok();
 114    }
 115
 116    let recent_panic_on: Option<i64> = headers
 117        .get("x-zed-panicked-on")
 118        .and_then(|h| h.to_str().ok())
 119        .and_then(|s| s.parse().ok());
 120
 121    let installation_id = headers
 122        .get("x-zed-installation-id")
 123        .and_then(|h| h.to_str().ok())
 124        .map(|s| s.to_string())
 125        .unwrap_or_default();
 126
 127    let mut recent_panic = None;
 128
 129    if let Some(recent_panic_on) = recent_panic_on {
 130        let crashed_at = match report.timestamp() {
 131            Ok(t) => Some(t),
 132            Err(e) => {
 133                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 134                None
 135            }
 136        };
 137        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 138            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 139        }
 140    }
 141
 142    let description = report.description(recent_panic);
 143    let summary = report.backtrace_summary();
 144
 145    tracing::error!(
 146        service = "client",
 147        version = %report.header.app_version,
 148        os_version = %report.header.os_version,
 149        bundle_id = %report.header.bundle_id,
 150        incident_id = %report.header.incident_id,
 151        installation_id = %installation_id,
 152        description = %description,
 153        backtrace = %summary,
 154        "crash report"
 155    );
 156
 157    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 158        let payload = slack::WebhookBody::new(|w| {
 159            w.add_section(|s| s.text(slack::Text::markdown(description)))
 160                .add_section(|s| {
 161                    s.add_field(slack::Text::markdown(format!(
 162                        "*Version:*\n{} ({})",
 163                        bundle_id, app_version
 164                    )))
 165                    .add_field({
 166                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 167                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 168                            hostname.strip_prefix("http://").unwrap_or_default()
 169                        });
 170
 171                        slack::Text::markdown(format!(
 172                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 173                            CRASH_REPORTS_BUCKET,
 174                            hostname,
 175                            report.header.incident_id,
 176                            report
 177                                .header
 178                                .incident_id
 179                                .chars()
 180                                .take(8)
 181                                .collect::<String>(),
 182                        ))
 183                    })
 184                })
 185                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 186        });
 187        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 188            log::error!("Failed to serialize payload to JSON: {err}");
 189            Error::Internal(anyhow!(err))
 190        })?;
 191
 192        reqwest::Client::new()
 193            .post(slack_panics_webhook)
 194            .header("Content-Type", "application/json")
 195            .body(payload_json)
 196            .send()
 197            .await
 198            .map_err(|err| {
 199                log::error!("Failed to send payload to Slack: {err}");
 200                Error::Internal(anyhow!(err))
 201            })?;
 202    }
 203
 204    Ok(())
 205}
 206
 207pub async fn post_hang(
 208    Extension(app): Extension<Arc<AppState>>,
 209    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 210    body: Bytes,
 211) -> Result<()> {
 212    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 213        return Err(Error::http(
 214            StatusCode::INTERNAL_SERVER_ERROR,
 215            "events not enabled".into(),
 216        ))?;
 217    };
 218
 219    if checksum != expected {
 220        return Err(Error::http(
 221            StatusCode::BAD_REQUEST,
 222            "invalid checksum".into(),
 223        ))?;
 224    }
 225
 226    let incident_id = Uuid::new_v4().to_string();
 227
 228    // dump JSON into S3 so we can get frame offsets if we need to.
 229    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 230        blob_store_client
 231            .put_object()
 232            .bucket(CRASH_REPORTS_BUCKET)
 233            .key(incident_id.clone() + ".hang.json")
 234            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 235            .body(ByteStream::from(body.to_vec()))
 236            .send()
 237            .await
 238            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 239            .ok();
 240    }
 241
 242    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
 243        log::error!("can't parse report json: {err}");
 244        Error::Internal(anyhow!(err))
 245    })?;
 246
 247    let mut backtrace = "Possible hang detected on main thread:".to_string();
 248    let unknown = "<unknown>".to_string();
 249    for frame in report.backtrace.iter() {
 250        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
 251    }
 252
 253    tracing::error!(
 254        service = "client",
 255        version = %report.app_version.unwrap_or_default().to_string(),
 256        os_name = %report.os_name,
 257        os_version = report.os_version.unwrap_or_default().to_string(),
 258        incident_id = %incident_id,
 259        installation_id = %report.installation_id.unwrap_or_default(),
 260        backtrace = %backtrace,
 261        "hang report");
 262
 263    Ok(())
 264}
 265
 266pub async fn post_panic(
 267    Extension(app): Extension<Arc<AppState>>,
 268    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 269    body: Bytes,
 270) -> Result<()> {
 271    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 272        return Err(Error::http(
 273            StatusCode::INTERNAL_SERVER_ERROR,
 274            "events not enabled".into(),
 275        ))?;
 276    };
 277
 278    if checksum != expected {
 279        return Err(Error::http(
 280            StatusCode::BAD_REQUEST,
 281            "invalid checksum".into(),
 282        ))?;
 283    }
 284
 285    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
 286        .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
 287    let panic = report.panic;
 288
 289    if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
 290        return Err(Error::http(
 291            StatusCode::BAD_REQUEST,
 292            "invalid os version".into(),
 293        ))?;
 294    }
 295
 296    tracing::error!(
 297        service = "client",
 298        version = %panic.app_version,
 299        os_name = %panic.os_name,
 300        os_version = %panic.os_version.clone().unwrap_or_default(),
 301        installation_id = %panic.installation_id.clone().unwrap_or_default(),
 302        description = %panic.payload,
 303        backtrace = %panic.backtrace.join("\n"),
 304        "panic report"
 305    );
 306
 307    let backtrace = if panic.backtrace.len() > 25 {
 308        let total = panic.backtrace.len();
 309        format!(
 310            "{}\n   and {} more",
 311            panic
 312                .backtrace
 313                .iter()
 314                .take(20)
 315                .cloned()
 316                .collect::<Vec<_>>()
 317                .join("\n"),
 318            total - 20
 319        )
 320    } else {
 321        panic.backtrace.join("\n")
 322    };
 323
 324    if !report_to_slack(&panic) {
 325        return Ok(());
 326    }
 327
 328    let backtrace_with_summary = panic.payload + "\n" + &backtrace;
 329
 330    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 331        let payload = slack::WebhookBody::new(|w| {
 332            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
 333                .add_section(|s| {
 334                    s.add_field(slack::Text::markdown(format!(
 335                        "*Version:*\n {} ",
 336                        panic.app_version
 337                    )))
 338                    .add_field({
 339                        slack::Text::markdown(format!(
 340                            "*OS:*\n{} {}",
 341                            panic.os_name,
 342                            panic.os_version.unwrap_or_default()
 343                        ))
 344                    })
 345                })
 346                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
 347        });
 348        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 349            log::error!("Failed to serialize payload to JSON: {err}");
 350            Error::Internal(anyhow!(err))
 351        })?;
 352
 353        reqwest::Client::new()
 354            .post(slack_panics_webhook)
 355            .header("Content-Type", "application/json")
 356            .body(payload_json)
 357            .send()
 358            .await
 359            .map_err(|err| {
 360                log::error!("Failed to send payload to Slack: {err}");
 361                Error::Internal(anyhow!(err))
 362            })?;
 363    }
 364
 365    Ok(())
 366}
 367
 368fn report_to_slack(panic: &Panic) -> bool {
 369    if panic.payload.contains("ERROR_SURFACE_LOST_KHR") {
 370        return false;
 371    }
 372
 373    if panic.payload.contains("ERROR_INITIALIZATION_FAILED") {
 374        return false;
 375    }
 376
 377    if panic
 378        .payload
 379        .contains("GPU has crashed, and no debug information is available")
 380    {
 381        return false;
 382    }
 383
 384    true
 385}
 386
 387pub async fn post_events(
 388    Extension(app): Extension<Arc<AppState>>,
 389    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 390    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 391    body: Bytes,
 392) -> Result<()> {
 393    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 394        return Err(Error::http(
 395            StatusCode::INTERNAL_SERVER_ERROR,
 396            "events not enabled".into(),
 397        ))?;
 398    };
 399
 400    let checksum_matched = checksum == expected;
 401
 402    let request_body: telemetry_events::EventRequestBody =
 403        serde_json::from_slice(&body).map_err(|err| {
 404            log::error!("can't parse event json: {err}");
 405            Error::Internal(anyhow!(err))
 406        })?;
 407
 408    let mut to_upload = ToUpload::default();
 409    let Some(last_event) = request_body.events.last() else {
 410        return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
 411    };
 412    let country_code = country_code_header.map(|h| h.to_string());
 413
 414    let first_event_at = chrono::Utc::now()
 415        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 416
 417    if let Some(kinesis_client) = app.kinesis_client.clone() {
 418        if let Some(stream) = app.config.kinesis_stream.clone() {
 419            let mut request = kinesis_client.put_records().stream_name(stream);
 420            for row in for_snowflake(request_body.clone(), first_event_at) {
 421                if let Some(data) = serde_json::to_vec(&row).log_err() {
 422                    request = request.records(
 423                        aws_sdk_kinesis::types::PutRecordsRequestEntry::builder()
 424                            .partition_key(request_body.system_id.clone().unwrap_or_default())
 425                            .data(data.into())
 426                            .build()
 427                            .unwrap(),
 428                    );
 429                }
 430            }
 431            request.send().await.log_err();
 432        }
 433    };
 434
 435    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 436        Err(Error::http(
 437            StatusCode::NOT_IMPLEMENTED,
 438            "not supported".into(),
 439        ))?
 440    };
 441
 442    let first_event_at = chrono::Utc::now()
 443        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 444
 445    for wrapper in &request_body.events {
 446        match &wrapper.event {
 447            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 448                event.clone(),
 449                wrapper,
 450                &request_body,
 451                first_event_at,
 452                country_code.clone(),
 453                checksum_matched,
 454            )),
 455            Event::InlineCompletion(event) => {
 456                to_upload
 457                    .inline_completion_events
 458                    .push(InlineCompletionEventRow::from_event(
 459                        event.clone(),
 460                        wrapper,
 461                        &request_body,
 462                        first_event_at,
 463                        country_code.clone(),
 464                        checksum_matched,
 465                    ))
 466            }
 467            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 468                event.clone(),
 469                wrapper,
 470                &request_body,
 471                first_event_at,
 472                checksum_matched,
 473            )),
 474            Event::Assistant(event) => {
 475                to_upload
 476                    .assistant_events
 477                    .push(AssistantEventRow::from_event(
 478                        event.clone(),
 479                        wrapper,
 480                        &request_body,
 481                        first_event_at,
 482                        checksum_matched,
 483                    ))
 484            }
 485            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 486                event.clone(),
 487                wrapper,
 488                &request_body,
 489                first_event_at,
 490                checksum_matched,
 491            )),
 492            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 493                event.clone(),
 494                wrapper,
 495                &request_body,
 496                first_event_at,
 497                checksum_matched,
 498            )),
 499            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 500                event.clone(),
 501                wrapper,
 502                &request_body,
 503                first_event_at,
 504                checksum_matched,
 505            )),
 506            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 507                event.clone(),
 508                wrapper,
 509                &request_body,
 510                first_event_at,
 511                checksum_matched,
 512            )),
 513            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 514                event.clone(),
 515                wrapper,
 516                &request_body,
 517                first_event_at,
 518                checksum_matched,
 519            )),
 520            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 521                event.clone(),
 522                wrapper,
 523                &request_body,
 524                first_event_at,
 525                checksum_matched,
 526            )),
 527            Event::Extension(event) => {
 528                let metadata = app
 529                    .db
 530                    .get_extension_version(&event.extension_id, &event.version)
 531                    .await?;
 532                to_upload
 533                    .extension_events
 534                    .push(ExtensionEventRow::from_event(
 535                        event.clone(),
 536                        wrapper,
 537                        &request_body,
 538                        metadata,
 539                        first_event_at,
 540                        checksum_matched,
 541                    ))
 542            }
 543            Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
 544                event.clone(),
 545                wrapper,
 546                &request_body,
 547                first_event_at,
 548                checksum_matched,
 549            )),
 550        }
 551    }
 552
 553    to_upload
 554        .upload(&clickhouse_client)
 555        .await
 556        .map_err(|err| Error::Internal(anyhow!(err)))?;
 557
 558    Ok(())
 559}
 560
 561#[derive(Default)]
 562struct ToUpload {
 563    editor_events: Vec<EditorEventRow>,
 564    inline_completion_events: Vec<InlineCompletionEventRow>,
 565    assistant_events: Vec<AssistantEventRow>,
 566    call_events: Vec<CallEventRow>,
 567    cpu_events: Vec<CpuEventRow>,
 568    memory_events: Vec<MemoryEventRow>,
 569    app_events: Vec<AppEventRow>,
 570    setting_events: Vec<SettingEventRow>,
 571    extension_events: Vec<ExtensionEventRow>,
 572    edit_events: Vec<EditEventRow>,
 573    action_events: Vec<ActionEventRow>,
 574    repl_events: Vec<ReplEventRow>,
 575}
 576
 577impl ToUpload {
 578    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 579        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 580        write_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 581            .await
 582            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 583
 584        const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
 585        write_to_table(
 586            INLINE_COMPLETION_EVENTS_TABLE,
 587            &self.inline_completion_events,
 588            clickhouse_client,
 589        )
 590        .await
 591        .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
 592
 593        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 594        write_to_table(
 595            ASSISTANT_EVENTS_TABLE,
 596            &self.assistant_events,
 597            clickhouse_client,
 598        )
 599        .await
 600        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 601
 602        const CALL_EVENTS_TABLE: &str = "call_events";
 603        write_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 604            .await
 605            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 606
 607        const CPU_EVENTS_TABLE: &str = "cpu_events";
 608        write_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 609            .await
 610            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 611
 612        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 613        write_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 614            .await
 615            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 616
 617        const APP_EVENTS_TABLE: &str = "app_events";
 618        write_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 619            .await
 620            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 621
 622        const SETTING_EVENTS_TABLE: &str = "setting_events";
 623        write_to_table(
 624            SETTING_EVENTS_TABLE,
 625            &self.setting_events,
 626            clickhouse_client,
 627        )
 628        .await
 629        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 630
 631        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 632        write_to_table(
 633            EXTENSION_EVENTS_TABLE,
 634            &self.extension_events,
 635            clickhouse_client,
 636        )
 637        .await
 638        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 639
 640        const EDIT_EVENTS_TABLE: &str = "edit_events";
 641        write_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 642            .await
 643            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 644
 645        const ACTION_EVENTS_TABLE: &str = "action_events";
 646        write_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 647            .await
 648            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 649
 650        const REPL_EVENTS_TABLE: &str = "repl_events";
 651        write_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
 652            .await
 653            .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
 654
 655        Ok(())
 656    }
 657}
 658
 659pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 660where
 661    S: Serializer,
 662{
 663    if country_code.len() != 2 {
 664        use serde::ser::Error;
 665        return Err(S::Error::custom(
 666            "country_code must be exactly 2 characters",
 667        ));
 668    }
 669
 670    let country_code = country_code.as_bytes();
 671
 672    serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
 673}
 674
 675#[derive(Serialize, Debug, clickhouse::Row)]
 676pub struct EditorEventRow {
 677    system_id: String,
 678    installation_id: String,
 679    session_id: Option<String>,
 680    metrics_id: String,
 681    operation: String,
 682    app_version: String,
 683    file_extension: String,
 684    os_name: String,
 685    os_version: String,
 686    release_channel: String,
 687    signed_in: bool,
 688    vim_mode: bool,
 689    #[serde(serialize_with = "serialize_country_code")]
 690    country_code: String,
 691    region_code: String,
 692    city: String,
 693    time: i64,
 694    copilot_enabled: bool,
 695    copilot_enabled_for_language: bool,
 696    architecture: String,
 697    is_staff: Option<bool>,
 698    major: Option<i32>,
 699    minor: Option<i32>,
 700    patch: Option<i32>,
 701    checksum_matched: bool,
 702    is_via_ssh: bool,
 703}
 704
 705impl EditorEventRow {
 706    fn from_event(
 707        event: EditorEvent,
 708        wrapper: &EventWrapper,
 709        body: &EventRequestBody,
 710        first_event_at: chrono::DateTime<chrono::Utc>,
 711        country_code: Option<String>,
 712        checksum_matched: bool,
 713    ) -> Self {
 714        let semver = body.semver();
 715        let time =
 716            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 717
 718        Self {
 719            app_version: body.app_version.clone(),
 720            major: semver.map(|v| v.major() as i32),
 721            minor: semver.map(|v| v.minor() as i32),
 722            patch: semver.map(|v| v.patch() as i32),
 723            checksum_matched,
 724            release_channel: body.release_channel.clone().unwrap_or_default(),
 725            os_name: body.os_name.clone(),
 726            os_version: body.os_version.clone().unwrap_or_default(),
 727            architecture: body.architecture.clone(),
 728            system_id: body.system_id.clone().unwrap_or_default(),
 729            installation_id: body.installation_id.clone().unwrap_or_default(),
 730            session_id: body.session_id.clone(),
 731            metrics_id: body.metrics_id.clone().unwrap_or_default(),
 732            is_staff: body.is_staff,
 733            time: time.timestamp_millis(),
 734            operation: event.operation,
 735            file_extension: event.file_extension.unwrap_or_default(),
 736            signed_in: wrapper.signed_in,
 737            vim_mode: event.vim_mode,
 738            copilot_enabled: event.copilot_enabled,
 739            copilot_enabled_for_language: event.copilot_enabled_for_language,
 740            country_code: country_code.unwrap_or("XX".to_string()),
 741            region_code: "".to_string(),
 742            city: "".to_string(),
 743            is_via_ssh: event.is_via_ssh,
 744        }
 745    }
 746}
 747
 748#[derive(Serialize, Debug, clickhouse::Row)]
 749pub struct InlineCompletionEventRow {
 750    installation_id: String,
 751    session_id: Option<String>,
 752    provider: String,
 753    suggestion_accepted: bool,
 754    app_version: String,
 755    file_extension: String,
 756    os_name: String,
 757    os_version: String,
 758    release_channel: String,
 759    signed_in: bool,
 760    #[serde(serialize_with = "serialize_country_code")]
 761    country_code: String,
 762    region_code: String,
 763    city: String,
 764    time: i64,
 765    is_staff: Option<bool>,
 766    major: Option<i32>,
 767    minor: Option<i32>,
 768    patch: Option<i32>,
 769    checksum_matched: bool,
 770}
 771
 772impl InlineCompletionEventRow {
 773    fn from_event(
 774        event: InlineCompletionEvent,
 775        wrapper: &EventWrapper,
 776        body: &EventRequestBody,
 777        first_event_at: chrono::DateTime<chrono::Utc>,
 778        country_code: Option<String>,
 779        checksum_matched: bool,
 780    ) -> Self {
 781        let semver = body.semver();
 782        let time =
 783            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 784
 785        Self {
 786            app_version: body.app_version.clone(),
 787            major: semver.map(|v| v.major() as i32),
 788            minor: semver.map(|v| v.minor() as i32),
 789            patch: semver.map(|v| v.patch() as i32),
 790            checksum_matched,
 791            release_channel: body.release_channel.clone().unwrap_or_default(),
 792            os_name: body.os_name.clone(),
 793            os_version: body.os_version.clone().unwrap_or_default(),
 794            installation_id: body.installation_id.clone().unwrap_or_default(),
 795            session_id: body.session_id.clone(),
 796            is_staff: body.is_staff,
 797            time: time.timestamp_millis(),
 798            file_extension: event.file_extension.unwrap_or_default(),
 799            signed_in: wrapper.signed_in,
 800            country_code: country_code.unwrap_or("XX".to_string()),
 801            region_code: "".to_string(),
 802            city: "".to_string(),
 803            provider: event.provider,
 804            suggestion_accepted: event.suggestion_accepted,
 805        }
 806    }
 807}
 808
 809#[derive(Serialize, Debug, clickhouse::Row)]
 810pub struct CallEventRow {
 811    // AppInfoBase
 812    app_version: String,
 813    major: Option<i32>,
 814    minor: Option<i32>,
 815    patch: Option<i32>,
 816    release_channel: String,
 817    os_name: String,
 818    os_version: String,
 819    checksum_matched: bool,
 820
 821    // ClientEventBase
 822    installation_id: String,
 823    session_id: Option<String>,
 824    is_staff: Option<bool>,
 825    time: i64,
 826
 827    // CallEventRow
 828    operation: String,
 829    room_id: Option<u64>,
 830    channel_id: Option<u64>,
 831}
 832
 833impl CallEventRow {
 834    fn from_event(
 835        event: CallEvent,
 836        wrapper: &EventWrapper,
 837        body: &EventRequestBody,
 838        first_event_at: chrono::DateTime<chrono::Utc>,
 839        checksum_matched: bool,
 840    ) -> Self {
 841        let semver = body.semver();
 842        let time =
 843            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 844
 845        Self {
 846            app_version: body.app_version.clone(),
 847            major: semver.map(|v| v.major() as i32),
 848            minor: semver.map(|v| v.minor() as i32),
 849            patch: semver.map(|v| v.patch() as i32),
 850            checksum_matched,
 851            release_channel: body.release_channel.clone().unwrap_or_default(),
 852            os_name: body.os_name.clone(),
 853            os_version: body.os_version.clone().unwrap_or_default(),
 854            installation_id: body.installation_id.clone().unwrap_or_default(),
 855            session_id: body.session_id.clone(),
 856            is_staff: body.is_staff,
 857            time: time.timestamp_millis(),
 858            operation: event.operation,
 859            room_id: event.room_id,
 860            channel_id: event.channel_id,
 861        }
 862    }
 863}
 864
 865#[derive(Serialize, Debug, clickhouse::Row)]
 866pub struct AssistantEventRow {
 867    // AppInfoBase
 868    app_version: String,
 869    major: Option<i32>,
 870    minor: Option<i32>,
 871    patch: Option<i32>,
 872    checksum_matched: bool,
 873    release_channel: String,
 874    os_name: String,
 875    os_version: String,
 876
 877    // ClientEventBase
 878    installation_id: Option<String>,
 879    session_id: Option<String>,
 880    is_staff: Option<bool>,
 881    time: i64,
 882
 883    // AssistantEventRow
 884    conversation_id: String,
 885    kind: String,
 886    phase: String,
 887    model: String,
 888    response_latency_in_ms: Option<i64>,
 889    error_message: Option<String>,
 890}
 891
 892impl AssistantEventRow {
 893    fn from_event(
 894        event: AssistantEvent,
 895        wrapper: &EventWrapper,
 896        body: &EventRequestBody,
 897        first_event_at: chrono::DateTime<chrono::Utc>,
 898        checksum_matched: bool,
 899    ) -> Self {
 900        let semver = body.semver();
 901        let time =
 902            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 903
 904        Self {
 905            app_version: body.app_version.clone(),
 906            major: semver.map(|v| v.major() as i32),
 907            minor: semver.map(|v| v.minor() as i32),
 908            patch: semver.map(|v| v.patch() as i32),
 909            checksum_matched,
 910            release_channel: body.release_channel.clone().unwrap_or_default(),
 911            os_name: body.os_name.clone(),
 912            os_version: body.os_version.clone().unwrap_or_default(),
 913            installation_id: body.installation_id.clone(),
 914            session_id: body.session_id.clone(),
 915            is_staff: body.is_staff,
 916            time: time.timestamp_millis(),
 917            conversation_id: event.conversation_id.unwrap_or_default(),
 918            kind: event.kind.to_string(),
 919            phase: event.phase.to_string(),
 920            model: event.model,
 921            response_latency_in_ms: event
 922                .response_latency
 923                .map(|latency| latency.as_millis() as i64),
 924            error_message: event.error_message,
 925        }
 926    }
 927}
 928
 929#[derive(Debug, clickhouse::Row, Serialize)]
 930pub struct CpuEventRow {
 931    installation_id: Option<String>,
 932    session_id: Option<String>,
 933    is_staff: Option<bool>,
 934    usage_as_percentage: f32,
 935    core_count: u32,
 936    app_version: String,
 937    release_channel: String,
 938    os_name: String,
 939    os_version: String,
 940    time: i64,
 941    // pub normalized_cpu_usage: f64, MATERIALIZED
 942    major: Option<i32>,
 943    minor: Option<i32>,
 944    patch: Option<i32>,
 945    checksum_matched: bool,
 946}
 947
 948impl CpuEventRow {
 949    fn from_event(
 950        event: CpuEvent,
 951        wrapper: &EventWrapper,
 952        body: &EventRequestBody,
 953        first_event_at: chrono::DateTime<chrono::Utc>,
 954        checksum_matched: bool,
 955    ) -> Self {
 956        let semver = body.semver();
 957        let time =
 958            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 959
 960        Self {
 961            app_version: body.app_version.clone(),
 962            major: semver.map(|v| v.major() as i32),
 963            minor: semver.map(|v| v.minor() as i32),
 964            patch: semver.map(|v| v.patch() as i32),
 965            checksum_matched,
 966            release_channel: body.release_channel.clone().unwrap_or_default(),
 967            os_name: body.os_name.clone(),
 968            os_version: body.os_version.clone().unwrap_or_default(),
 969            installation_id: body.installation_id.clone(),
 970            session_id: body.session_id.clone(),
 971            is_staff: body.is_staff,
 972            time: time.timestamp_millis(),
 973            usage_as_percentage: event.usage_as_percentage,
 974            core_count: event.core_count,
 975        }
 976    }
 977}
 978
 979#[derive(Serialize, Debug, clickhouse::Row)]
 980pub struct MemoryEventRow {
 981    // AppInfoBase
 982    app_version: String,
 983    major: Option<i32>,
 984    minor: Option<i32>,
 985    patch: Option<i32>,
 986    checksum_matched: bool,
 987    release_channel: String,
 988    os_name: String,
 989    os_version: String,
 990
 991    // ClientEventBase
 992    installation_id: Option<String>,
 993    session_id: Option<String>,
 994    is_staff: Option<bool>,
 995    time: i64,
 996
 997    // MemoryEventRow
 998    memory_in_bytes: u64,
 999    virtual_memory_in_bytes: u64,
1000}
1001
1002impl MemoryEventRow {
1003    fn from_event(
1004        event: MemoryEvent,
1005        wrapper: &EventWrapper,
1006        body: &EventRequestBody,
1007        first_event_at: chrono::DateTime<chrono::Utc>,
1008        checksum_matched: bool,
1009    ) -> Self {
1010        let semver = body.semver();
1011        let time =
1012            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1013
1014        Self {
1015            app_version: body.app_version.clone(),
1016            major: semver.map(|v| v.major() as i32),
1017            minor: semver.map(|v| v.minor() as i32),
1018            patch: semver.map(|v| v.patch() as i32),
1019            checksum_matched,
1020            release_channel: body.release_channel.clone().unwrap_or_default(),
1021            os_name: body.os_name.clone(),
1022            os_version: body.os_version.clone().unwrap_or_default(),
1023            installation_id: body.installation_id.clone(),
1024            session_id: body.session_id.clone(),
1025            is_staff: body.is_staff,
1026            time: time.timestamp_millis(),
1027            memory_in_bytes: event.memory_in_bytes,
1028            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
1029        }
1030    }
1031}
1032
1033#[derive(Serialize, Debug, clickhouse::Row)]
1034pub struct AppEventRow {
1035    // AppInfoBase
1036    app_version: String,
1037    major: Option<i32>,
1038    minor: Option<i32>,
1039    patch: Option<i32>,
1040    checksum_matched: bool,
1041    release_channel: String,
1042    os_name: String,
1043    os_version: String,
1044
1045    // ClientEventBase
1046    installation_id: Option<String>,
1047    session_id: Option<String>,
1048    is_staff: Option<bool>,
1049    time: i64,
1050
1051    // AppEventRow
1052    operation: String,
1053}
1054
1055impl AppEventRow {
1056    fn from_event(
1057        event: AppEvent,
1058        wrapper: &EventWrapper,
1059        body: &EventRequestBody,
1060        first_event_at: chrono::DateTime<chrono::Utc>,
1061        checksum_matched: bool,
1062    ) -> Self {
1063        let semver = body.semver();
1064        let time =
1065            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1066
1067        Self {
1068            app_version: body.app_version.clone(),
1069            major: semver.map(|v| v.major() as i32),
1070            minor: semver.map(|v| v.minor() as i32),
1071            patch: semver.map(|v| v.patch() as i32),
1072            checksum_matched,
1073            release_channel: body.release_channel.clone().unwrap_or_default(),
1074            os_name: body.os_name.clone(),
1075            os_version: body.os_version.clone().unwrap_or_default(),
1076            installation_id: body.installation_id.clone(),
1077            session_id: body.session_id.clone(),
1078            is_staff: body.is_staff,
1079            time: time.timestamp_millis(),
1080            operation: event.operation,
1081        }
1082    }
1083}
1084
1085#[derive(Serialize, Debug, clickhouse::Row)]
1086pub struct SettingEventRow {
1087    // AppInfoBase
1088    app_version: String,
1089    major: Option<i32>,
1090    minor: Option<i32>,
1091    patch: Option<i32>,
1092    checksum_matched: bool,
1093    release_channel: String,
1094    os_name: String,
1095    os_version: String,
1096
1097    // ClientEventBase
1098    installation_id: Option<String>,
1099    session_id: Option<String>,
1100    is_staff: Option<bool>,
1101    time: i64,
1102    // SettingEventRow
1103    setting: String,
1104    value: String,
1105}
1106
1107impl SettingEventRow {
1108    fn from_event(
1109        event: SettingEvent,
1110        wrapper: &EventWrapper,
1111        body: &EventRequestBody,
1112        first_event_at: chrono::DateTime<chrono::Utc>,
1113        checksum_matched: bool,
1114    ) -> Self {
1115        let semver = body.semver();
1116        let time =
1117            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1118
1119        Self {
1120            app_version: body.app_version.clone(),
1121            major: semver.map(|v| v.major() as i32),
1122            minor: semver.map(|v| v.minor() as i32),
1123            checksum_matched,
1124            patch: semver.map(|v| v.patch() as i32),
1125            release_channel: body.release_channel.clone().unwrap_or_default(),
1126            os_name: body.os_name.clone(),
1127            os_version: body.os_version.clone().unwrap_or_default(),
1128            installation_id: body.installation_id.clone(),
1129            session_id: body.session_id.clone(),
1130            is_staff: body.is_staff,
1131            time: time.timestamp_millis(),
1132            setting: event.setting,
1133            value: event.value,
1134        }
1135    }
1136}
1137
1138#[derive(Serialize, Debug, clickhouse::Row)]
1139pub struct ExtensionEventRow {
1140    // AppInfoBase
1141    app_version: String,
1142    major: Option<i32>,
1143    minor: Option<i32>,
1144    patch: Option<i32>,
1145    checksum_matched: bool,
1146    release_channel: String,
1147    os_name: String,
1148    os_version: String,
1149
1150    // ClientEventBase
1151    installation_id: Option<String>,
1152    session_id: Option<String>,
1153    is_staff: Option<bool>,
1154    time: i64,
1155
1156    // ExtensionEventRow
1157    extension_id: Arc<str>,
1158    extension_version: Arc<str>,
1159    dev: bool,
1160    schema_version: Option<i32>,
1161    wasm_api_version: Option<String>,
1162}
1163
1164impl ExtensionEventRow {
1165    fn from_event(
1166        event: ExtensionEvent,
1167        wrapper: &EventWrapper,
1168        body: &EventRequestBody,
1169        extension_metadata: Option<ExtensionMetadata>,
1170        first_event_at: chrono::DateTime<chrono::Utc>,
1171        checksum_matched: bool,
1172    ) -> Self {
1173        let semver = body.semver();
1174        let time =
1175            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1176
1177        Self {
1178            app_version: body.app_version.clone(),
1179            major: semver.map(|v| v.major() as i32),
1180            minor: semver.map(|v| v.minor() as i32),
1181            patch: semver.map(|v| v.patch() as i32),
1182            checksum_matched,
1183            release_channel: body.release_channel.clone().unwrap_or_default(),
1184            os_name: body.os_name.clone(),
1185            os_version: body.os_version.clone().unwrap_or_default(),
1186            installation_id: body.installation_id.clone(),
1187            session_id: body.session_id.clone(),
1188            is_staff: body.is_staff,
1189            time: time.timestamp_millis(),
1190            extension_id: event.extension_id,
1191            extension_version: event.version,
1192            dev: extension_metadata.is_none(),
1193            schema_version: extension_metadata
1194                .as_ref()
1195                .and_then(|metadata| metadata.manifest.schema_version),
1196            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1197                metadata
1198                    .manifest
1199                    .wasm_api_version
1200                    .as_ref()
1201                    .map(|version| version.to_string())
1202            }),
1203        }
1204    }
1205}
1206
1207#[derive(Serialize, Debug, clickhouse::Row)]
1208pub struct ReplEventRow {
1209    // AppInfoBase
1210    app_version: String,
1211    major: Option<i32>,
1212    minor: Option<i32>,
1213    patch: Option<i32>,
1214    checksum_matched: bool,
1215    release_channel: String,
1216    os_name: String,
1217    os_version: String,
1218
1219    // ClientEventBase
1220    installation_id: Option<String>,
1221    session_id: Option<String>,
1222    is_staff: Option<bool>,
1223    time: i64,
1224
1225    // ReplEventRow
1226    kernel_language: String,
1227    kernel_status: String,
1228    repl_session_id: String,
1229}
1230
1231impl ReplEventRow {
1232    fn from_event(
1233        event: ReplEvent,
1234        wrapper: &EventWrapper,
1235        body: &EventRequestBody,
1236        first_event_at: chrono::DateTime<chrono::Utc>,
1237        checksum_matched: bool,
1238    ) -> Self {
1239        let semver = body.semver();
1240        let time =
1241            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1242
1243        Self {
1244            app_version: body.app_version.clone(),
1245            major: semver.map(|v| v.major() as i32),
1246            minor: semver.map(|v| v.minor() as i32),
1247            patch: semver.map(|v| v.patch() as i32),
1248            checksum_matched,
1249            release_channel: body.release_channel.clone().unwrap_or_default(),
1250            os_name: body.os_name.clone(),
1251            os_version: body.os_version.clone().unwrap_or_default(),
1252            installation_id: body.installation_id.clone(),
1253            session_id: body.session_id.clone(),
1254            is_staff: body.is_staff,
1255            time: time.timestamp_millis(),
1256            kernel_language: event.kernel_language,
1257            kernel_status: event.kernel_status,
1258            repl_session_id: event.repl_session_id,
1259        }
1260    }
1261}
1262
1263#[derive(Serialize, Debug, clickhouse::Row)]
1264pub struct EditEventRow {
1265    // AppInfoBase
1266    app_version: String,
1267    major: Option<i32>,
1268    minor: Option<i32>,
1269    patch: Option<i32>,
1270    checksum_matched: bool,
1271    release_channel: String,
1272    os_name: String,
1273    os_version: String,
1274
1275    // ClientEventBase
1276    installation_id: Option<String>,
1277    // Note: This column name has a typo in the ClickHouse table.
1278    #[serde(rename = "sesssion_id")]
1279    session_id: Option<String>,
1280    is_staff: Option<bool>,
1281    time: i64,
1282
1283    // EditEventRow
1284    period_start: i64,
1285    period_end: i64,
1286    environment: String,
1287    is_via_ssh: bool,
1288}
1289
1290impl EditEventRow {
1291    fn from_event(
1292        event: EditEvent,
1293        wrapper: &EventWrapper,
1294        body: &EventRequestBody,
1295        first_event_at: chrono::DateTime<chrono::Utc>,
1296        checksum_matched: bool,
1297    ) -> Self {
1298        let semver = body.semver();
1299        let time =
1300            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1301
1302        let period_start = time - chrono::Duration::milliseconds(event.duration);
1303        let period_end = time;
1304
1305        Self {
1306            app_version: body.app_version.clone(),
1307            major: semver.map(|v| v.major() as i32),
1308            minor: semver.map(|v| v.minor() as i32),
1309            patch: semver.map(|v| v.patch() as i32),
1310            checksum_matched,
1311            release_channel: body.release_channel.clone().unwrap_or_default(),
1312            os_name: body.os_name.clone(),
1313            os_version: body.os_version.clone().unwrap_or_default(),
1314            installation_id: body.installation_id.clone(),
1315            session_id: body.session_id.clone(),
1316            is_staff: body.is_staff,
1317            time: time.timestamp_millis(),
1318            period_start: period_start.timestamp_millis(),
1319            period_end: period_end.timestamp_millis(),
1320            environment: event.environment,
1321            is_via_ssh: event.is_via_ssh,
1322        }
1323    }
1324}
1325
1326#[derive(Serialize, Debug, clickhouse::Row)]
1327pub struct ActionEventRow {
1328    // AppInfoBase
1329    app_version: String,
1330    major: Option<i32>,
1331    minor: Option<i32>,
1332    patch: Option<i32>,
1333    checksum_matched: bool,
1334    release_channel: String,
1335    os_name: String,
1336    os_version: String,
1337
1338    // ClientEventBase
1339    installation_id: Option<String>,
1340    // Note: This column name has a typo in the ClickHouse table.
1341    #[serde(rename = "sesssion_id")]
1342    session_id: Option<String>,
1343    is_staff: Option<bool>,
1344    time: i64,
1345    // ActionEventRow
1346    source: String,
1347    action: String,
1348}
1349
1350impl ActionEventRow {
1351    fn from_event(
1352        event: ActionEvent,
1353        wrapper: &EventWrapper,
1354        body: &EventRequestBody,
1355        first_event_at: chrono::DateTime<chrono::Utc>,
1356        checksum_matched: bool,
1357    ) -> Self {
1358        let semver = body.semver();
1359        let time =
1360            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1361
1362        Self {
1363            app_version: body.app_version.clone(),
1364            major: semver.map(|v| v.major() as i32),
1365            minor: semver.map(|v| v.minor() as i32),
1366            patch: semver.map(|v| v.patch() as i32),
1367            checksum_matched,
1368            release_channel: body.release_channel.clone().unwrap_or_default(),
1369            os_name: body.os_name.clone(),
1370            os_version: body.os_version.clone().unwrap_or_default(),
1371            installation_id: body.installation_id.clone(),
1372            session_id: body.session_id.clone(),
1373            is_staff: body.is_staff,
1374            time: time.timestamp_millis(),
1375            source: event.source,
1376            action: event.action,
1377        }
1378    }
1379}
1380
1381pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1382    let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
1383
1384    let mut summer = Sha256::new();
1385    summer.update(checksum_seed);
1386    summer.update(json);
1387    summer.update(checksum_seed);
1388    Some(summer.finalize().into_iter().collect())
1389}
1390
1391fn for_snowflake(
1392    body: EventRequestBody,
1393    first_event_at: chrono::DateTime<chrono::Utc>,
1394) -> impl Iterator<Item = SnowflakeRow> {
1395    body.events.into_iter().map(move |event| SnowflakeRow {
1396        event: match &event.event {
1397            Event::Editor(editor_event) => format!("editor_{}", editor_event.operation),
1398            Event::InlineCompletion(inline_completion_event) => format!(
1399                "inline_completion_{}",
1400                if inline_completion_event.suggestion_accepted {
1401                    "accept "
1402                } else {
1403                    "discard"
1404                }
1405            ),
1406            Event::Call(call_event) => format!("call_{}", call_event.operation.replace(" ", "_")),
1407            Event::Assistant(assistant_event) => {
1408                format!(
1409                    "assistant_{}",
1410                    match assistant_event.phase {
1411                        telemetry_events::AssistantPhase::Response => "response",
1412                        telemetry_events::AssistantPhase::Invoked => "invoke",
1413                        telemetry_events::AssistantPhase::Accepted => "accept",
1414                        telemetry_events::AssistantPhase::Rejected => "reject",
1415                    }
1416                )
1417            }
1418            Event::Cpu(_) => "system_cpu".to_string(),
1419            Event::Memory(_) => "system_memory".to_string(),
1420            Event::App(app_event) => app_event.operation.replace(" ", "_"),
1421            Event::Setting(_) => "setting_change".to_string(),
1422            Event::Extension(_) => "extension_load".to_string(),
1423            Event::Edit(_) => "edit".to_string(),
1424            Event::Action(_) => "command_palette_action".to_string(),
1425            Event::Repl(_) => "repl".to_string(),
1426        },
1427        system_id: body.system_id.clone(),
1428        timestamp: first_event_at + Duration::milliseconds(event.milliseconds_since_first_event),
1429        data: SnowflakeData {
1430            installation_id: body.installation_id.clone(),
1431            session_id: body.session_id.clone(),
1432            metrics_id: body.metrics_id.clone(),
1433            is_staff: body.is_staff,
1434            app_version: body.app_version.clone(),
1435            os_name: body.os_name.clone(),
1436            os_version: body.os_version.clone(),
1437            architecture: body.architecture.clone(),
1438            release_channel: body.release_channel.clone(),
1439            signed_in: event.signed_in,
1440            editor_event: match &event.event {
1441                Event::Editor(editor_event) => Some(editor_event.clone()),
1442                _ => None,
1443            },
1444            inline_completion_event: match &event.event {
1445                Event::InlineCompletion(inline_completion_event) => {
1446                    Some(inline_completion_event.clone())
1447                }
1448                _ => None,
1449            },
1450            call_event: match &event.event {
1451                Event::Call(call_event) => Some(call_event.clone()),
1452                _ => None,
1453            },
1454            assistant_event: match &event.event {
1455                Event::Assistant(assistant_event) => Some(assistant_event.clone()),
1456                _ => None,
1457            },
1458            cpu_event: match &event.event {
1459                Event::Cpu(cpu_event) => Some(cpu_event.clone()),
1460                _ => None,
1461            },
1462            memory_event: match &event.event {
1463                Event::Memory(memory_event) => Some(memory_event.clone()),
1464                _ => None,
1465            },
1466            app_event: match &event.event {
1467                Event::App(app_event) => Some(app_event.clone()),
1468                _ => None,
1469            },
1470            setting_event: match &event.event {
1471                Event::Setting(setting_event) => Some(setting_event.clone()),
1472                _ => None,
1473            },
1474            extension_event: match &event.event {
1475                Event::Extension(extension_event) => Some(extension_event.clone()),
1476                _ => None,
1477            },
1478            edit_event: match &event.event {
1479                Event::Edit(edit_event) => Some(edit_event.clone()),
1480                _ => None,
1481            },
1482            repl_event: match &event.event {
1483                Event::Repl(repl_event) => Some(repl_event.clone()),
1484                _ => None,
1485            },
1486            action_event: match event.event {
1487                Event::Action(action_event) => Some(action_event.clone()),
1488                _ => None,
1489            },
1490        },
1491    })
1492}
1493
1494#[derive(Serialize, Deserialize)]
1495struct SnowflakeRow {
1496    pub event: String,
1497    pub system_id: Option<String>,
1498    pub timestamp: chrono::DateTime<chrono::Utc>,
1499    pub data: SnowflakeData,
1500}
1501
1502#[derive(Serialize, Deserialize)]
1503struct SnowflakeData {
1504    /// Identifier unique to each Zed installation (differs for stable, preview, dev)
1505    pub installation_id: Option<String>,
1506    /// Identifier unique to each logged in Zed user (randomly generated on first sign in)
1507    /// Identifier unique to each Zed session (differs for each time you open Zed)
1508    pub session_id: Option<String>,
1509    pub metrics_id: Option<String>,
1510    /// True for Zed staff, otherwise false
1511    pub is_staff: Option<bool>,
1512    /// Zed version number
1513    pub app_version: String,
1514    pub os_name: String,
1515    pub os_version: Option<String>,
1516    pub architecture: String,
1517    /// Zed release channel (stable, preview, dev)
1518    pub release_channel: Option<String>,
1519    pub signed_in: bool,
1520
1521    #[serde(flatten)]
1522    pub editor_event: Option<EditorEvent>,
1523    #[serde(flatten)]
1524    pub inline_completion_event: Option<InlineCompletionEvent>,
1525    #[serde(flatten)]
1526    pub call_event: Option<CallEvent>,
1527    #[serde(flatten)]
1528    pub assistant_event: Option<AssistantEvent>,
1529    #[serde(flatten)]
1530    pub cpu_event: Option<CpuEvent>,
1531    #[serde(flatten)]
1532    pub memory_event: Option<MemoryEvent>,
1533    #[serde(flatten)]
1534    pub app_event: Option<AppEvent>,
1535    #[serde(flatten)]
1536    pub setting_event: Option<SettingEvent>,
1537    #[serde(flatten)]
1538    pub extension_event: Option<ExtensionEvent>,
1539    #[serde(flatten)]
1540    pub edit_event: Option<EditEvent>,
1541    #[serde(flatten)]
1542    pub repl_event: Option<ReplEvent>,
1543    #[serde(flatten)]
1544    pub action_event: Option<ActionEvent>,
1545}