events.rs

   1use super::ips_file::IpsFile;
   2use crate::api::CloudflareIpCountryHeader;
   3use crate::clickhouse::write_to_table;
   4use crate::{api::slack, AppState, Error, Result};
   5use anyhow::{anyhow, Context};
   6use aws_sdk_s3::primitives::ByteStream;
   7use axum::{
   8    body::Bytes,
   9    headers::Header,
  10    http::{HeaderMap, HeaderName, StatusCode},
  11    routing::post,
  12    Extension, Router, TypedHeader,
  13};
  14use rpc::ExtensionMetadata;
  15use semantic_version::SemanticVersion;
  16use serde::{Serialize, Serializer};
  17use sha2::{Digest, Sha256};
  18use std::sync::{Arc, OnceLock};
  19use telemetry_events::{
  20    ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
  21    EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, ReplEvent,
  22    SettingEvent,
  23};
  24use uuid::Uuid;
  25
  26static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  27
  28pub fn router() -> Router {
  29    Router::new()
  30        .route("/telemetry/events", post(post_events))
  31        .route("/telemetry/crashes", post(post_crash))
  32        .route("/telemetry/panics", post(post_panic))
  33        .route("/telemetry/hangs", post(post_hang))
  34}
  35
  36pub struct ZedChecksumHeader(Vec<u8>);
  37
  38impl Header for ZedChecksumHeader {
  39    fn name() -> &'static HeaderName {
  40        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  41        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  42    }
  43
  44    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  45    where
  46        Self: Sized,
  47        I: Iterator<Item = &'i axum::http::HeaderValue>,
  48    {
  49        let checksum = values
  50            .next()
  51            .ok_or_else(axum::headers::Error::invalid)?
  52            .to_str()
  53            .map_err(|_| axum::headers::Error::invalid())?;
  54
  55        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  56        Ok(Self(bytes))
  57    }
  58
  59    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  60        unimplemented!()
  61    }
  62}
  63
  64pub async fn post_crash(
  65    Extension(app): Extension<Arc<AppState>>,
  66    headers: HeaderMap,
  67    body: Bytes,
  68) -> Result<()> {
  69    let report = IpsFile::parse(&body)?;
  70    let version_threshold = SemanticVersion::new(0, 123, 0);
  71
  72    let bundle_id = &report.header.bundle_id;
  73    let app_version = &report.app_version();
  74
  75    if bundle_id == "dev.zed.Zed-Dev" {
  76        log::error!("Crash uploads from {} are ignored.", bundle_id);
  77        return Ok(());
  78    }
  79
  80    if app_version.is_none() || app_version.unwrap() < version_threshold {
  81        log::error!(
  82            "Crash uploads from {} are ignored.",
  83            report.header.app_version
  84        );
  85        return Ok(());
  86    }
  87    let app_version = app_version.unwrap();
  88
  89    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
  90        let response = blob_store_client
  91            .head_object()
  92            .bucket(CRASH_REPORTS_BUCKET)
  93            .key(report.header.incident_id.clone() + ".ips")
  94            .send()
  95            .await;
  96
  97        if response.is_ok() {
  98            log::info!("We've already uploaded this crash");
  99            return Ok(());
 100        }
 101
 102        blob_store_client
 103            .put_object()
 104            .bucket(CRASH_REPORTS_BUCKET)
 105            .key(report.header.incident_id.clone() + ".ips")
 106            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 107            .body(ByteStream::from(body.to_vec()))
 108            .send()
 109            .await
 110            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 111            .ok();
 112    }
 113
 114    let recent_panic_on: Option<i64> = headers
 115        .get("x-zed-panicked-on")
 116        .and_then(|h| h.to_str().ok())
 117        .and_then(|s| s.parse().ok());
 118
 119    let installation_id = headers
 120        .get("x-zed-installation-id")
 121        .and_then(|h| h.to_str().ok())
 122        .map(|s| s.to_string())
 123        .unwrap_or_default();
 124
 125    let mut recent_panic = None;
 126
 127    if let Some(recent_panic_on) = recent_panic_on {
 128        let crashed_at = match report.timestamp() {
 129            Ok(t) => Some(t),
 130            Err(e) => {
 131                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 132                None
 133            }
 134        };
 135        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 136            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 137        }
 138    }
 139
 140    let description = report.description(recent_panic);
 141    let summary = report.backtrace_summary();
 142
 143    tracing::error!(
 144        service = "client",
 145        version = %report.header.app_version,
 146        os_version = %report.header.os_version,
 147        bundle_id = %report.header.bundle_id,
 148        incident_id = %report.header.incident_id,
 149        installation_id = %installation_id,
 150        description = %description,
 151        backtrace = %summary,
 152        "crash report"
 153    );
 154
 155    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 156        let payload = slack::WebhookBody::new(|w| {
 157            w.add_section(|s| s.text(slack::Text::markdown(description)))
 158                .add_section(|s| {
 159                    s.add_field(slack::Text::markdown(format!(
 160                        "*Version:*\n{} ({})",
 161                        bundle_id, app_version
 162                    )))
 163                    .add_field({
 164                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 165                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 166                            hostname.strip_prefix("http://").unwrap_or_default()
 167                        });
 168
 169                        slack::Text::markdown(format!(
 170                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 171                            CRASH_REPORTS_BUCKET,
 172                            hostname,
 173                            report.header.incident_id,
 174                            report
 175                                .header
 176                                .incident_id
 177                                .chars()
 178                                .take(8)
 179                                .collect::<String>(),
 180                        ))
 181                    })
 182                })
 183                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 184        });
 185        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 186            log::error!("Failed to serialize payload to JSON: {err}");
 187            Error::Internal(anyhow!(err))
 188        })?;
 189
 190        reqwest::Client::new()
 191            .post(slack_panics_webhook)
 192            .header("Content-Type", "application/json")
 193            .body(payload_json)
 194            .send()
 195            .await
 196            .map_err(|err| {
 197                log::error!("Failed to send payload to Slack: {err}");
 198                Error::Internal(anyhow!(err))
 199            })?;
 200    }
 201
 202    Ok(())
 203}
 204
 205pub async fn post_hang(
 206    Extension(app): Extension<Arc<AppState>>,
 207    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 208    body: Bytes,
 209) -> Result<()> {
 210    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 211        return Err(Error::http(
 212            StatusCode::INTERNAL_SERVER_ERROR,
 213            "events not enabled".into(),
 214        ))?;
 215    };
 216
 217    if checksum != expected {
 218        return Err(Error::http(
 219            StatusCode::BAD_REQUEST,
 220            "invalid checksum".into(),
 221        ))?;
 222    }
 223
 224    let incident_id = Uuid::new_v4().to_string();
 225
 226    // dump JSON into S3 so we can get frame offsets if we need to.
 227    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 228        blob_store_client
 229            .put_object()
 230            .bucket(CRASH_REPORTS_BUCKET)
 231            .key(incident_id.clone() + ".hang.json")
 232            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 233            .body(ByteStream::from(body.to_vec()))
 234            .send()
 235            .await
 236            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 237            .ok();
 238    }
 239
 240    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
 241        log::error!("can't parse report json: {err}");
 242        Error::Internal(anyhow!(err))
 243    })?;
 244
 245    let mut backtrace = "Possible hang detected on main thread:".to_string();
 246    let unknown = "<unknown>".to_string();
 247    for frame in report.backtrace.iter() {
 248        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
 249    }
 250
 251    tracing::error!(
 252        service = "client",
 253        version = %report.app_version.unwrap_or_default().to_string(),
 254        os_name = %report.os_name,
 255        os_version = report.os_version.unwrap_or_default().to_string(),
 256        incident_id = %incident_id,
 257        installation_id = %report.installation_id.unwrap_or_default(),
 258        backtrace = %backtrace,
 259        "hang report");
 260
 261    Ok(())
 262}
 263
 264pub async fn post_panic(
 265    Extension(app): Extension<Arc<AppState>>,
 266    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 267    body: Bytes,
 268) -> Result<()> {
 269    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 270        return Err(Error::http(
 271            StatusCode::INTERNAL_SERVER_ERROR,
 272            "events not enabled".into(),
 273        ))?;
 274    };
 275
 276    if checksum != expected {
 277        return Err(Error::http(
 278            StatusCode::BAD_REQUEST,
 279            "invalid checksum".into(),
 280        ))?;
 281    }
 282
 283    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
 284        .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
 285    let panic = report.panic;
 286
 287    if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
 288        return Err(Error::http(
 289            StatusCode::BAD_REQUEST,
 290            "invalid os version".into(),
 291        ))?;
 292    }
 293
 294    tracing::error!(
 295        service = "client",
 296        version = %panic.app_version,
 297        os_name = %panic.os_name,
 298        os_version = %panic.os_version.clone().unwrap_or_default(),
 299        installation_id = %panic.installation_id.unwrap_or_default(),
 300        description = %panic.payload,
 301        backtrace = %panic.backtrace.join("\n"),
 302        "panic report");
 303
 304    let backtrace = if panic.backtrace.len() > 25 {
 305        let total = panic.backtrace.len();
 306        format!(
 307            "{}\n   and {} more",
 308            panic
 309                .backtrace
 310                .iter()
 311                .take(20)
 312                .cloned()
 313                .collect::<Vec<_>>()
 314                .join("\n"),
 315            total - 20
 316        )
 317    } else {
 318        panic.backtrace.join("\n")
 319    };
 320    let backtrace_with_summary = panic.payload + "\n" + &backtrace;
 321
 322    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 323        let payload = slack::WebhookBody::new(|w| {
 324            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
 325                .add_section(|s| {
 326                    s.add_field(slack::Text::markdown(format!(
 327                        "*Version:*\n {} ",
 328                        panic.app_version
 329                    )))
 330                    .add_field({
 331                        slack::Text::markdown(format!(
 332                            "*OS:*\n{} {}",
 333                            panic.os_name,
 334                            panic.os_version.unwrap_or_default()
 335                        ))
 336                    })
 337                })
 338                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
 339        });
 340        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 341            log::error!("Failed to serialize payload to JSON: {err}");
 342            Error::Internal(anyhow!(err))
 343        })?;
 344
 345        reqwest::Client::new()
 346            .post(slack_panics_webhook)
 347            .header("Content-Type", "application/json")
 348            .body(payload_json)
 349            .send()
 350            .await
 351            .map_err(|err| {
 352                log::error!("Failed to send payload to Slack: {err}");
 353                Error::Internal(anyhow!(err))
 354            })?;
 355    }
 356
 357    Ok(())
 358}
 359
 360pub async fn post_events(
 361    Extension(app): Extension<Arc<AppState>>,
 362    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 363    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 364    body: Bytes,
 365) -> Result<()> {
 366    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 367        Err(Error::http(
 368            StatusCode::NOT_IMPLEMENTED,
 369            "not supported".into(),
 370        ))?
 371    };
 372
 373    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 374        return Err(Error::http(
 375            StatusCode::INTERNAL_SERVER_ERROR,
 376            "events not enabled".into(),
 377        ))?;
 378    };
 379
 380    let checksum_matched = checksum == expected;
 381
 382    let request_body: telemetry_events::EventRequestBody =
 383        serde_json::from_slice(&body).map_err(|err| {
 384            log::error!("can't parse event json: {err}");
 385            Error::Internal(anyhow!(err))
 386        })?;
 387
 388    let mut to_upload = ToUpload::default();
 389    let Some(last_event) = request_body.events.last() else {
 390        return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
 391    };
 392    let country_code = country_code_header.map(|h| h.to_string());
 393
 394    let first_event_at = chrono::Utc::now()
 395        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 396
 397    for wrapper in &request_body.events {
 398        match &wrapper.event {
 399            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 400                event.clone(),
 401                wrapper,
 402                &request_body,
 403                first_event_at,
 404                country_code.clone(),
 405                checksum_matched,
 406            )),
 407            // Needed for clients sending old copilot_event types
 408            Event::Copilot(_) => {}
 409            Event::InlineCompletion(event) => {
 410                to_upload
 411                    .inline_completion_events
 412                    .push(InlineCompletionEventRow::from_event(
 413                        event.clone(),
 414                        wrapper,
 415                        &request_body,
 416                        first_event_at,
 417                        country_code.clone(),
 418                        checksum_matched,
 419                    ))
 420            }
 421            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 422                event.clone(),
 423                wrapper,
 424                &request_body,
 425                first_event_at,
 426                checksum_matched,
 427            )),
 428            Event::Assistant(event) => {
 429                to_upload
 430                    .assistant_events
 431                    .push(AssistantEventRow::from_event(
 432                        event.clone(),
 433                        wrapper,
 434                        &request_body,
 435                        first_event_at,
 436                        checksum_matched,
 437                    ))
 438            }
 439            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 440                event.clone(),
 441                wrapper,
 442                &request_body,
 443                first_event_at,
 444                checksum_matched,
 445            )),
 446            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 447                event.clone(),
 448                wrapper,
 449                &request_body,
 450                first_event_at,
 451                checksum_matched,
 452            )),
 453            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 454                event.clone(),
 455                wrapper,
 456                &request_body,
 457                first_event_at,
 458                checksum_matched,
 459            )),
 460            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 461                event.clone(),
 462                wrapper,
 463                &request_body,
 464                first_event_at,
 465                checksum_matched,
 466            )),
 467            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 468                event.clone(),
 469                wrapper,
 470                &request_body,
 471                first_event_at,
 472                checksum_matched,
 473            )),
 474            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 475                event.clone(),
 476                wrapper,
 477                &request_body,
 478                first_event_at,
 479                checksum_matched,
 480            )),
 481            Event::Extension(event) => {
 482                let metadata = app
 483                    .db
 484                    .get_extension_version(&event.extension_id, &event.version)
 485                    .await?;
 486                to_upload
 487                    .extension_events
 488                    .push(ExtensionEventRow::from_event(
 489                        event.clone(),
 490                        wrapper,
 491                        &request_body,
 492                        metadata,
 493                        first_event_at,
 494                        checksum_matched,
 495                    ))
 496            }
 497            Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
 498                event.clone(),
 499                wrapper,
 500                &request_body,
 501                first_event_at,
 502                checksum_matched,
 503            )),
 504        }
 505    }
 506
 507    to_upload
 508        .upload(&clickhouse_client)
 509        .await
 510        .map_err(|err| Error::Internal(anyhow!(err)))?;
 511
 512    Ok(())
 513}
 514
 515#[derive(Default)]
 516struct ToUpload {
 517    editor_events: Vec<EditorEventRow>,
 518    inline_completion_events: Vec<InlineCompletionEventRow>,
 519    assistant_events: Vec<AssistantEventRow>,
 520    call_events: Vec<CallEventRow>,
 521    cpu_events: Vec<CpuEventRow>,
 522    memory_events: Vec<MemoryEventRow>,
 523    app_events: Vec<AppEventRow>,
 524    setting_events: Vec<SettingEventRow>,
 525    extension_events: Vec<ExtensionEventRow>,
 526    edit_events: Vec<EditEventRow>,
 527    action_events: Vec<ActionEventRow>,
 528    repl_events: Vec<ReplEventRow>,
 529}
 530
 531impl ToUpload {
 532    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 533        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 534        write_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 535            .await
 536            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 537
 538        const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
 539        write_to_table(
 540            INLINE_COMPLETION_EVENTS_TABLE,
 541            &self.inline_completion_events,
 542            clickhouse_client,
 543        )
 544        .await
 545        .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
 546
 547        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 548        write_to_table(
 549            ASSISTANT_EVENTS_TABLE,
 550            &self.assistant_events,
 551            clickhouse_client,
 552        )
 553        .await
 554        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 555
 556        const CALL_EVENTS_TABLE: &str = "call_events";
 557        write_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 558            .await
 559            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 560
 561        const CPU_EVENTS_TABLE: &str = "cpu_events";
 562        write_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 563            .await
 564            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 565
 566        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 567        write_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 568            .await
 569            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 570
 571        const APP_EVENTS_TABLE: &str = "app_events";
 572        write_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 573            .await
 574            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 575
 576        const SETTING_EVENTS_TABLE: &str = "setting_events";
 577        write_to_table(
 578            SETTING_EVENTS_TABLE,
 579            &self.setting_events,
 580            clickhouse_client,
 581        )
 582        .await
 583        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 584
 585        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 586        write_to_table(
 587            EXTENSION_EVENTS_TABLE,
 588            &self.extension_events,
 589            clickhouse_client,
 590        )
 591        .await
 592        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 593
 594        const EDIT_EVENTS_TABLE: &str = "edit_events";
 595        write_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 596            .await
 597            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 598
 599        const ACTION_EVENTS_TABLE: &str = "action_events";
 600        write_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 601            .await
 602            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 603
 604        const REPL_EVENTS_TABLE: &str = "repl_events";
 605        write_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
 606            .await
 607            .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
 608
 609        Ok(())
 610    }
 611}
 612
 613pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 614where
 615    S: Serializer,
 616{
 617    if country_code.len() != 2 {
 618        use serde::ser::Error;
 619        return Err(S::Error::custom(
 620            "country_code must be exactly 2 characters",
 621        ));
 622    }
 623
 624    let country_code = country_code.as_bytes();
 625
 626    serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
 627}
 628
 629#[derive(Serialize, Debug, clickhouse::Row)]
 630pub struct EditorEventRow {
 631    system_id: String,
 632    installation_id: String,
 633    session_id: Option<String>,
 634    metrics_id: String,
 635    operation: String,
 636    app_version: String,
 637    file_extension: String,
 638    os_name: String,
 639    os_version: String,
 640    release_channel: String,
 641    signed_in: bool,
 642    vim_mode: bool,
 643    #[serde(serialize_with = "serialize_country_code")]
 644    country_code: String,
 645    region_code: String,
 646    city: String,
 647    time: i64,
 648    copilot_enabled: bool,
 649    copilot_enabled_for_language: bool,
 650    historical_event: bool,
 651    architecture: String,
 652    is_staff: Option<bool>,
 653    major: Option<i32>,
 654    minor: Option<i32>,
 655    patch: Option<i32>,
 656    checksum_matched: bool,
 657}
 658
 659impl EditorEventRow {
 660    fn from_event(
 661        event: EditorEvent,
 662        wrapper: &EventWrapper,
 663        body: &EventRequestBody,
 664        first_event_at: chrono::DateTime<chrono::Utc>,
 665        country_code: Option<String>,
 666        checksum_matched: bool,
 667    ) -> Self {
 668        let semver = body.semver();
 669        let time =
 670            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 671
 672        Self {
 673            app_version: body.app_version.clone(),
 674            major: semver.map(|v| v.major() as i32),
 675            minor: semver.map(|v| v.minor() as i32),
 676            patch: semver.map(|v| v.patch() as i32),
 677            checksum_matched,
 678            release_channel: body.release_channel.clone().unwrap_or_default(),
 679            os_name: body.os_name.clone(),
 680            os_version: body.os_version.clone().unwrap_or_default(),
 681            architecture: body.architecture.clone(),
 682            system_id: body.system_id.clone().unwrap_or_default(),
 683            installation_id: body.installation_id.clone().unwrap_or_default(),
 684            session_id: body.session_id.clone(),
 685            metrics_id: body.metrics_id.clone().unwrap_or_default(),
 686            is_staff: body.is_staff,
 687            time: time.timestamp_millis(),
 688            operation: event.operation,
 689            file_extension: event.file_extension.unwrap_or_default(),
 690            signed_in: wrapper.signed_in,
 691            vim_mode: event.vim_mode,
 692            copilot_enabled: event.copilot_enabled,
 693            copilot_enabled_for_language: event.copilot_enabled_for_language,
 694            country_code: country_code.unwrap_or("XX".to_string()),
 695            region_code: "".to_string(),
 696            city: "".to_string(),
 697            historical_event: false,
 698        }
 699    }
 700}
 701
 702#[derive(Serialize, Debug, clickhouse::Row)]
 703pub struct InlineCompletionEventRow {
 704    installation_id: String,
 705    session_id: Option<String>,
 706    provider: String,
 707    suggestion_accepted: bool,
 708    app_version: String,
 709    file_extension: String,
 710    os_name: String,
 711    os_version: String,
 712    release_channel: String,
 713    signed_in: bool,
 714    #[serde(serialize_with = "serialize_country_code")]
 715    country_code: String,
 716    region_code: String,
 717    city: String,
 718    time: i64,
 719    is_staff: Option<bool>,
 720    major: Option<i32>,
 721    minor: Option<i32>,
 722    patch: Option<i32>,
 723    checksum_matched: bool,
 724}
 725
 726impl InlineCompletionEventRow {
 727    fn from_event(
 728        event: InlineCompletionEvent,
 729        wrapper: &EventWrapper,
 730        body: &EventRequestBody,
 731        first_event_at: chrono::DateTime<chrono::Utc>,
 732        country_code: Option<String>,
 733        checksum_matched: bool,
 734    ) -> Self {
 735        let semver = body.semver();
 736        let time =
 737            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 738
 739        Self {
 740            app_version: body.app_version.clone(),
 741            major: semver.map(|v| v.major() as i32),
 742            minor: semver.map(|v| v.minor() as i32),
 743            patch: semver.map(|v| v.patch() as i32),
 744            checksum_matched,
 745            release_channel: body.release_channel.clone().unwrap_or_default(),
 746            os_name: body.os_name.clone(),
 747            os_version: body.os_version.clone().unwrap_or_default(),
 748            installation_id: body.installation_id.clone().unwrap_or_default(),
 749            session_id: body.session_id.clone(),
 750            is_staff: body.is_staff,
 751            time: time.timestamp_millis(),
 752            file_extension: event.file_extension.unwrap_or_default(),
 753            signed_in: wrapper.signed_in,
 754            country_code: country_code.unwrap_or("XX".to_string()),
 755            region_code: "".to_string(),
 756            city: "".to_string(),
 757            provider: event.provider,
 758            suggestion_accepted: event.suggestion_accepted,
 759        }
 760    }
 761}
 762
 763#[derive(Serialize, Debug, clickhouse::Row)]
 764pub struct CallEventRow {
 765    // AppInfoBase
 766    app_version: String,
 767    major: Option<i32>,
 768    minor: Option<i32>,
 769    patch: Option<i32>,
 770    release_channel: String,
 771    os_name: String,
 772    os_version: String,
 773    checksum_matched: bool,
 774
 775    // ClientEventBase
 776    installation_id: String,
 777    session_id: Option<String>,
 778    is_staff: Option<bool>,
 779    time: i64,
 780
 781    // CallEventRow
 782    operation: String,
 783    room_id: Option<u64>,
 784    channel_id: Option<u64>,
 785}
 786
 787impl CallEventRow {
 788    fn from_event(
 789        event: CallEvent,
 790        wrapper: &EventWrapper,
 791        body: &EventRequestBody,
 792        first_event_at: chrono::DateTime<chrono::Utc>,
 793        checksum_matched: bool,
 794    ) -> Self {
 795        let semver = body.semver();
 796        let time =
 797            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 798
 799        Self {
 800            app_version: body.app_version.clone(),
 801            major: semver.map(|v| v.major() as i32),
 802            minor: semver.map(|v| v.minor() as i32),
 803            patch: semver.map(|v| v.patch() as i32),
 804            checksum_matched,
 805            release_channel: body.release_channel.clone().unwrap_or_default(),
 806            os_name: body.os_name.clone(),
 807            os_version: body.os_version.clone().unwrap_or_default(),
 808            installation_id: body.installation_id.clone().unwrap_or_default(),
 809            session_id: body.session_id.clone(),
 810            is_staff: body.is_staff,
 811            time: time.timestamp_millis(),
 812            operation: event.operation,
 813            room_id: event.room_id,
 814            channel_id: event.channel_id,
 815        }
 816    }
 817}
 818
 819#[derive(Serialize, Debug, clickhouse::Row)]
 820pub struct AssistantEventRow {
 821    // AppInfoBase
 822    app_version: String,
 823    major: Option<i32>,
 824    minor: Option<i32>,
 825    patch: Option<i32>,
 826    checksum_matched: bool,
 827    release_channel: String,
 828    os_name: String,
 829    os_version: String,
 830
 831    // ClientEventBase
 832    installation_id: Option<String>,
 833    session_id: Option<String>,
 834    is_staff: Option<bool>,
 835    time: i64,
 836
 837    // AssistantEventRow
 838    conversation_id: String,
 839    kind: String,
 840    phase: String,
 841    model: String,
 842    response_latency_in_ms: Option<i64>,
 843    error_message: Option<String>,
 844}
 845
 846impl AssistantEventRow {
 847    fn from_event(
 848        event: AssistantEvent,
 849        wrapper: &EventWrapper,
 850        body: &EventRequestBody,
 851        first_event_at: chrono::DateTime<chrono::Utc>,
 852        checksum_matched: bool,
 853    ) -> Self {
 854        let semver = body.semver();
 855        let time =
 856            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 857
 858        Self {
 859            app_version: body.app_version.clone(),
 860            major: semver.map(|v| v.major() as i32),
 861            minor: semver.map(|v| v.minor() as i32),
 862            patch: semver.map(|v| v.patch() as i32),
 863            checksum_matched,
 864            release_channel: body.release_channel.clone().unwrap_or_default(),
 865            os_name: body.os_name.clone(),
 866            os_version: body.os_version.clone().unwrap_or_default(),
 867            installation_id: body.installation_id.clone(),
 868            session_id: body.session_id.clone(),
 869            is_staff: body.is_staff,
 870            time: time.timestamp_millis(),
 871            conversation_id: event.conversation_id.unwrap_or_default(),
 872            kind: event.kind.to_string(),
 873            phase: event.phase.to_string(),
 874            model: event.model,
 875            response_latency_in_ms: event
 876                .response_latency
 877                .map(|latency| latency.as_millis() as i64),
 878            error_message: event.error_message,
 879        }
 880    }
 881}
 882
 883#[derive(Debug, clickhouse::Row, Serialize)]
 884pub struct CpuEventRow {
 885    system_id: Option<String>,
 886    installation_id: Option<String>,
 887    session_id: Option<String>,
 888    is_staff: Option<bool>,
 889    usage_as_percentage: f32,
 890    core_count: u32,
 891    app_version: String,
 892    release_channel: String,
 893    os_name: String,
 894    os_version: String,
 895    time: i64,
 896    // pub normalized_cpu_usage: f64, MATERIALIZED
 897    major: Option<i32>,
 898    minor: Option<i32>,
 899    patch: Option<i32>,
 900    checksum_matched: bool,
 901}
 902
 903impl CpuEventRow {
 904    fn from_event(
 905        event: CpuEvent,
 906        wrapper: &EventWrapper,
 907        body: &EventRequestBody,
 908        first_event_at: chrono::DateTime<chrono::Utc>,
 909        checksum_matched: bool,
 910    ) -> Self {
 911        let semver = body.semver();
 912        let time =
 913            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 914
 915        Self {
 916            app_version: body.app_version.clone(),
 917            major: semver.map(|v| v.major() as i32),
 918            minor: semver.map(|v| v.minor() as i32),
 919            patch: semver.map(|v| v.patch() as i32),
 920            checksum_matched,
 921            release_channel: body.release_channel.clone().unwrap_or_default(),
 922            os_name: body.os_name.clone(),
 923            os_version: body.os_version.clone().unwrap_or_default(),
 924            system_id: body.system_id.clone(),
 925            installation_id: body.installation_id.clone(),
 926            session_id: body.session_id.clone(),
 927            is_staff: body.is_staff,
 928            time: time.timestamp_millis(),
 929            usage_as_percentage: event.usage_as_percentage,
 930            core_count: event.core_count,
 931        }
 932    }
 933}
 934
 935#[derive(Serialize, Debug, clickhouse::Row)]
 936pub struct MemoryEventRow {
 937    // AppInfoBase
 938    app_version: String,
 939    major: Option<i32>,
 940    minor: Option<i32>,
 941    patch: Option<i32>,
 942    checksum_matched: bool,
 943    release_channel: String,
 944    os_name: String,
 945    os_version: String,
 946
 947    // ClientEventBase
 948    system_id: Option<String>,
 949    installation_id: Option<String>,
 950    session_id: Option<String>,
 951    is_staff: Option<bool>,
 952    time: i64,
 953
 954    // MemoryEventRow
 955    memory_in_bytes: u64,
 956    virtual_memory_in_bytes: u64,
 957}
 958
 959impl MemoryEventRow {
 960    fn from_event(
 961        event: MemoryEvent,
 962        wrapper: &EventWrapper,
 963        body: &EventRequestBody,
 964        first_event_at: chrono::DateTime<chrono::Utc>,
 965        checksum_matched: bool,
 966    ) -> Self {
 967        let semver = body.semver();
 968        let time =
 969            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 970
 971        Self {
 972            app_version: body.app_version.clone(),
 973            major: semver.map(|v| v.major() as i32),
 974            minor: semver.map(|v| v.minor() as i32),
 975            patch: semver.map(|v| v.patch() as i32),
 976            checksum_matched,
 977            release_channel: body.release_channel.clone().unwrap_or_default(),
 978            os_name: body.os_name.clone(),
 979            os_version: body.os_version.clone().unwrap_or_default(),
 980            system_id: body.system_id.clone(),
 981            installation_id: body.installation_id.clone(),
 982            session_id: body.session_id.clone(),
 983            is_staff: body.is_staff,
 984            time: time.timestamp_millis(),
 985            memory_in_bytes: event.memory_in_bytes,
 986            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
 987        }
 988    }
 989}
 990
 991#[derive(Serialize, Debug, clickhouse::Row)]
 992pub struct AppEventRow {
 993    // AppInfoBase
 994    app_version: String,
 995    major: Option<i32>,
 996    minor: Option<i32>,
 997    patch: Option<i32>,
 998    checksum_matched: bool,
 999    release_channel: String,
1000    os_name: String,
1001    os_version: String,
1002
1003    // ClientEventBase
1004    system_id: Option<String>,
1005    installation_id: Option<String>,
1006    session_id: Option<String>,
1007    is_staff: Option<bool>,
1008    time: i64,
1009
1010    // AppEventRow
1011    operation: String,
1012}
1013
1014impl AppEventRow {
1015    fn from_event(
1016        event: AppEvent,
1017        wrapper: &EventWrapper,
1018        body: &EventRequestBody,
1019        first_event_at: chrono::DateTime<chrono::Utc>,
1020        checksum_matched: bool,
1021    ) -> Self {
1022        let semver = body.semver();
1023        let time =
1024            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1025
1026        Self {
1027            app_version: body.app_version.clone(),
1028            major: semver.map(|v| v.major() as i32),
1029            minor: semver.map(|v| v.minor() as i32),
1030            patch: semver.map(|v| v.patch() as i32),
1031            checksum_matched,
1032            release_channel: body.release_channel.clone().unwrap_or_default(),
1033            os_name: body.os_name.clone(),
1034            os_version: body.os_version.clone().unwrap_or_default(),
1035            system_id: body.system_id.clone(),
1036            installation_id: body.installation_id.clone(),
1037            session_id: body.session_id.clone(),
1038            is_staff: body.is_staff,
1039            time: time.timestamp_millis(),
1040            operation: event.operation,
1041        }
1042    }
1043}
1044
1045#[derive(Serialize, Debug, clickhouse::Row)]
1046pub struct SettingEventRow {
1047    // AppInfoBase
1048    app_version: String,
1049    major: Option<i32>,
1050    minor: Option<i32>,
1051    patch: Option<i32>,
1052    checksum_matched: bool,
1053    release_channel: String,
1054    os_name: String,
1055    os_version: String,
1056
1057    // ClientEventBase
1058    system_id: Option<String>,
1059    installation_id: Option<String>,
1060    session_id: Option<String>,
1061    is_staff: Option<bool>,
1062    time: i64,
1063    // SettingEventRow
1064    setting: String,
1065    value: String,
1066}
1067
1068impl SettingEventRow {
1069    fn from_event(
1070        event: SettingEvent,
1071        wrapper: &EventWrapper,
1072        body: &EventRequestBody,
1073        first_event_at: chrono::DateTime<chrono::Utc>,
1074        checksum_matched: bool,
1075    ) -> Self {
1076        let semver = body.semver();
1077        let time =
1078            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1079
1080        Self {
1081            app_version: body.app_version.clone(),
1082            major: semver.map(|v| v.major() as i32),
1083            minor: semver.map(|v| v.minor() as i32),
1084            checksum_matched,
1085            patch: semver.map(|v| v.patch() as i32),
1086            release_channel: body.release_channel.clone().unwrap_or_default(),
1087            os_name: body.os_name.clone(),
1088            os_version: body.os_version.clone().unwrap_or_default(),
1089            system_id: body.system_id.clone(),
1090            installation_id: body.installation_id.clone(),
1091            session_id: body.session_id.clone(),
1092            is_staff: body.is_staff,
1093            time: time.timestamp_millis(),
1094            setting: event.setting,
1095            value: event.value,
1096        }
1097    }
1098}
1099
1100#[derive(Serialize, Debug, clickhouse::Row)]
1101pub struct ExtensionEventRow {
1102    // AppInfoBase
1103    app_version: String,
1104    major: Option<i32>,
1105    minor: Option<i32>,
1106    patch: Option<i32>,
1107    checksum_matched: bool,
1108    release_channel: String,
1109    os_name: String,
1110    os_version: String,
1111
1112    // ClientEventBase
1113    system_id: Option<String>,
1114    installation_id: Option<String>,
1115    session_id: Option<String>,
1116    is_staff: Option<bool>,
1117    time: i64,
1118
1119    // ExtensionEventRow
1120    extension_id: Arc<str>,
1121    extension_version: Arc<str>,
1122    dev: bool,
1123    schema_version: Option<i32>,
1124    wasm_api_version: Option<String>,
1125}
1126
1127impl ExtensionEventRow {
1128    fn from_event(
1129        event: ExtensionEvent,
1130        wrapper: &EventWrapper,
1131        body: &EventRequestBody,
1132        extension_metadata: Option<ExtensionMetadata>,
1133        first_event_at: chrono::DateTime<chrono::Utc>,
1134        checksum_matched: bool,
1135    ) -> Self {
1136        let semver = body.semver();
1137        let time =
1138            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1139
1140        Self {
1141            app_version: body.app_version.clone(),
1142            major: semver.map(|v| v.major() as i32),
1143            minor: semver.map(|v| v.minor() as i32),
1144            patch: semver.map(|v| v.patch() as i32),
1145            checksum_matched,
1146            release_channel: body.release_channel.clone().unwrap_or_default(),
1147            os_name: body.os_name.clone(),
1148            os_version: body.os_version.clone().unwrap_or_default(),
1149            system_id: body.system_id.clone(),
1150            installation_id: body.installation_id.clone(),
1151            session_id: body.session_id.clone(),
1152            is_staff: body.is_staff,
1153            time: time.timestamp_millis(),
1154            extension_id: event.extension_id,
1155            extension_version: event.version,
1156            dev: extension_metadata.is_none(),
1157            schema_version: extension_metadata
1158                .as_ref()
1159                .and_then(|metadata| metadata.manifest.schema_version),
1160            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1161                metadata
1162                    .manifest
1163                    .wasm_api_version
1164                    .as_ref()
1165                    .map(|version| version.to_string())
1166            }),
1167        }
1168    }
1169}
1170
1171#[derive(Serialize, Debug, clickhouse::Row)]
1172pub struct ReplEventRow {
1173    // AppInfoBase
1174    app_version: String,
1175    major: Option<i32>,
1176    minor: Option<i32>,
1177    patch: Option<i32>,
1178    checksum_matched: bool,
1179    release_channel: String,
1180    os_name: String,
1181    os_version: String,
1182
1183    // ClientEventBase
1184    installation_id: Option<String>,
1185    session_id: Option<String>,
1186    is_staff: Option<bool>,
1187    time: i64,
1188
1189    // ReplEventRow
1190    kernel_language: String,
1191    kernel_status: String,
1192    repl_session_id: String,
1193}
1194
1195impl ReplEventRow {
1196    fn from_event(
1197        event: ReplEvent,
1198        wrapper: &EventWrapper,
1199        body: &EventRequestBody,
1200        first_event_at: chrono::DateTime<chrono::Utc>,
1201        checksum_matched: bool,
1202    ) -> Self {
1203        let semver = body.semver();
1204        let time =
1205            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1206
1207        Self {
1208            app_version: body.app_version.clone(),
1209            major: semver.map(|v| v.major() as i32),
1210            minor: semver.map(|v| v.minor() as i32),
1211            patch: semver.map(|v| v.patch() as i32),
1212            checksum_matched,
1213            release_channel: body.release_channel.clone().unwrap_or_default(),
1214            os_name: body.os_name.clone(),
1215            os_version: body.os_version.clone().unwrap_or_default(),
1216            installation_id: body.installation_id.clone(),
1217            session_id: body.session_id.clone(),
1218            is_staff: body.is_staff,
1219            time: time.timestamp_millis(),
1220            kernel_language: event.kernel_language,
1221            kernel_status: event.kernel_status,
1222            repl_session_id: event.repl_session_id,
1223        }
1224    }
1225}
1226
1227#[derive(Serialize, Debug, clickhouse::Row)]
1228pub struct EditEventRow {
1229    // AppInfoBase
1230    app_version: String,
1231    major: Option<i32>,
1232    minor: Option<i32>,
1233    patch: Option<i32>,
1234    checksum_matched: bool,
1235    release_channel: String,
1236    os_name: String,
1237    os_version: String,
1238
1239    // ClientEventBase
1240    system_id: Option<String>,
1241    installation_id: Option<String>,
1242    // Note: This column name has a typo in the ClickHouse table.
1243    #[serde(rename = "sesssion_id")]
1244    session_id: Option<String>,
1245    is_staff: Option<bool>,
1246    time: i64,
1247
1248    // EditEventRow
1249    period_start: i64,
1250    period_end: i64,
1251    environment: String,
1252}
1253
1254impl EditEventRow {
1255    fn from_event(
1256        event: EditEvent,
1257        wrapper: &EventWrapper,
1258        body: &EventRequestBody,
1259        first_event_at: chrono::DateTime<chrono::Utc>,
1260        checksum_matched: bool,
1261    ) -> Self {
1262        let semver = body.semver();
1263        let time =
1264            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1265
1266        let period_start = time - chrono::Duration::milliseconds(event.duration);
1267        let period_end = time;
1268
1269        Self {
1270            app_version: body.app_version.clone(),
1271            major: semver.map(|v| v.major() as i32),
1272            minor: semver.map(|v| v.minor() as i32),
1273            patch: semver.map(|v| v.patch() as i32),
1274            checksum_matched,
1275            release_channel: body.release_channel.clone().unwrap_or_default(),
1276            os_name: body.os_name.clone(),
1277            os_version: body.os_version.clone().unwrap_or_default(),
1278            system_id: body.system_id.clone(),
1279            installation_id: body.installation_id.clone(),
1280            session_id: body.session_id.clone(),
1281            is_staff: body.is_staff,
1282            time: time.timestamp_millis(),
1283            period_start: period_start.timestamp_millis(),
1284            period_end: period_end.timestamp_millis(),
1285            environment: event.environment,
1286        }
1287    }
1288}
1289
1290#[derive(Serialize, Debug, clickhouse::Row)]
1291pub struct ActionEventRow {
1292    // AppInfoBase
1293    app_version: String,
1294    major: Option<i32>,
1295    minor: Option<i32>,
1296    patch: Option<i32>,
1297    checksum_matched: bool,
1298    release_channel: String,
1299    os_name: String,
1300    os_version: String,
1301
1302    // ClientEventBase
1303    installation_id: Option<String>,
1304    // Note: This column name has a typo in the ClickHouse table.
1305    #[serde(rename = "sesssion_id")]
1306    session_id: Option<String>,
1307    is_staff: Option<bool>,
1308    time: i64,
1309    // ActionEventRow
1310    source: String,
1311    action: String,
1312}
1313
1314impl ActionEventRow {
1315    fn from_event(
1316        event: ActionEvent,
1317        wrapper: &EventWrapper,
1318        body: &EventRequestBody,
1319        first_event_at: chrono::DateTime<chrono::Utc>,
1320        checksum_matched: bool,
1321    ) -> Self {
1322        let semver = body.semver();
1323        let time =
1324            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1325
1326        Self {
1327            app_version: body.app_version.clone(),
1328            major: semver.map(|v| v.major() as i32),
1329            minor: semver.map(|v| v.minor() as i32),
1330            patch: semver.map(|v| v.patch() as i32),
1331            checksum_matched,
1332            release_channel: body.release_channel.clone().unwrap_or_default(),
1333            os_name: body.os_name.clone(),
1334            os_version: body.os_version.clone().unwrap_or_default(),
1335            installation_id: body.installation_id.clone(),
1336            session_id: body.session_id.clone(),
1337            is_staff: body.is_staff,
1338            time: time.timestamp_millis(),
1339            source: event.source,
1340            action: event.action,
1341        }
1342    }
1343}
1344
1345pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1346    let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
1347
1348    let mut summer = Sha256::new();
1349    summer.update(checksum_seed);
1350    summer.update(json);
1351    summer.update(checksum_seed);
1352    Some(summer.finalize().into_iter().collect())
1353}