events.rs

   1use super::ips_file::IpsFile;
   2use crate::api::CloudflareIpCountryHeader;
   3use crate::clickhouse::write_to_table;
   4use crate::{api::slack, AppState, Error, Result};
   5use anyhow::{anyhow, Context};
   6use aws_sdk_s3::primitives::ByteStream;
   7use axum::{
   8    body::Bytes,
   9    headers::Header,
  10    http::{HeaderMap, HeaderName, StatusCode},
  11    routing::post,
  12    Extension, Router, TypedHeader,
  13};
  14use rpc::ExtensionMetadata;
  15use semantic_version::SemanticVersion;
  16use serde::{Serialize, Serializer};
  17use sha2::{Digest, Sha256};
  18use std::sync::{Arc, OnceLock};
  19use telemetry_events::{
  20    ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
  21    EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, ReplEvent,
  22    SettingEvent,
  23};
  24use uuid::Uuid;
  25
  26static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  27
  28pub fn router() -> Router {
  29    Router::new()
  30        .route("/telemetry/events", post(post_events))
  31        .route("/telemetry/crashes", post(post_crash))
  32        .route("/telemetry/panics", post(post_panic))
  33        .route("/telemetry/hangs", post(post_hang))
  34}
  35
  36pub struct ZedChecksumHeader(Vec<u8>);
  37
  38impl Header for ZedChecksumHeader {
  39    fn name() -> &'static HeaderName {
  40        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  41        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  42    }
  43
  44    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  45    where
  46        Self: Sized,
  47        I: Iterator<Item = &'i axum::http::HeaderValue>,
  48    {
  49        let checksum = values
  50            .next()
  51            .ok_or_else(axum::headers::Error::invalid)?
  52            .to_str()
  53            .map_err(|_| axum::headers::Error::invalid())?;
  54
  55        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  56        Ok(Self(bytes))
  57    }
  58
  59    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  60        unimplemented!()
  61    }
  62}
  63
  64pub async fn post_crash(
  65    Extension(app): Extension<Arc<AppState>>,
  66    headers: HeaderMap,
  67    body: Bytes,
  68) -> Result<()> {
  69    let report = IpsFile::parse(&body)?;
  70    let version_threshold = SemanticVersion::new(0, 123, 0);
  71
  72    let bundle_id = &report.header.bundle_id;
  73    let app_version = &report.app_version();
  74
  75    if bundle_id == "dev.zed.Zed-Dev" {
  76        log::error!("Crash uploads from {} are ignored.", bundle_id);
  77        return Ok(());
  78    }
  79
  80    if app_version.is_none() || app_version.unwrap() < version_threshold {
  81        log::error!(
  82            "Crash uploads from {} are ignored.",
  83            report.header.app_version
  84        );
  85        return Ok(());
  86    }
  87    let app_version = app_version.unwrap();
  88
  89    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
  90        let response = blob_store_client
  91            .head_object()
  92            .bucket(CRASH_REPORTS_BUCKET)
  93            .key(report.header.incident_id.clone() + ".ips")
  94            .send()
  95            .await;
  96
  97        if response.is_ok() {
  98            log::info!("We've already uploaded this crash");
  99            return Ok(());
 100        }
 101
 102        blob_store_client
 103            .put_object()
 104            .bucket(CRASH_REPORTS_BUCKET)
 105            .key(report.header.incident_id.clone() + ".ips")
 106            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 107            .body(ByteStream::from(body.to_vec()))
 108            .send()
 109            .await
 110            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 111            .ok();
 112    }
 113
 114    let recent_panic_on: Option<i64> = headers
 115        .get("x-zed-panicked-on")
 116        .and_then(|h| h.to_str().ok())
 117        .and_then(|s| s.parse().ok());
 118
 119    let installation_id = headers
 120        .get("x-zed-installation-id")
 121        .and_then(|h| h.to_str().ok())
 122        .map(|s| s.to_string())
 123        .unwrap_or_default();
 124
 125    let mut recent_panic = None;
 126
 127    if let Some(recent_panic_on) = recent_panic_on {
 128        let crashed_at = match report.timestamp() {
 129            Ok(t) => Some(t),
 130            Err(e) => {
 131                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 132                None
 133            }
 134        };
 135        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 136            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 137        }
 138    }
 139
 140    let description = report.description(recent_panic);
 141    let summary = report.backtrace_summary();
 142
 143    tracing::error!(
 144        service = "client",
 145        version = %report.header.app_version,
 146        os_version = %report.header.os_version,
 147        bundle_id = %report.header.bundle_id,
 148        incident_id = %report.header.incident_id,
 149        installation_id = %installation_id,
 150        description = %description,
 151        backtrace = %summary,
 152        "crash report");
 153
 154    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 155        let payload = slack::WebhookBody::new(|w| {
 156            w.add_section(|s| s.text(slack::Text::markdown(description)))
 157                .add_section(|s| {
 158                    s.add_field(slack::Text::markdown(format!(
 159                        "*Version:*\n{} ({})",
 160                        bundle_id, app_version
 161                    )))
 162                    .add_field({
 163                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 164                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 165                            hostname.strip_prefix("http://").unwrap_or_default()
 166                        });
 167
 168                        slack::Text::markdown(format!(
 169                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 170                            CRASH_REPORTS_BUCKET,
 171                            hostname,
 172                            report.header.incident_id,
 173                            report
 174                                .header
 175                                .incident_id
 176                                .chars()
 177                                .take(8)
 178                                .collect::<String>(),
 179                        ))
 180                    })
 181                })
 182                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 183        });
 184        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 185            log::error!("Failed to serialize payload to JSON: {err}");
 186            Error::Internal(anyhow!(err))
 187        })?;
 188
 189        reqwest::Client::new()
 190            .post(slack_panics_webhook)
 191            .header("Content-Type", "application/json")
 192            .body(payload_json)
 193            .send()
 194            .await
 195            .map_err(|err| {
 196                log::error!("Failed to send payload to Slack: {err}");
 197                Error::Internal(anyhow!(err))
 198            })?;
 199    }
 200
 201    Ok(())
 202}
 203
 204pub async fn post_hang(
 205    Extension(app): Extension<Arc<AppState>>,
 206    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 207    body: Bytes,
 208) -> Result<()> {
 209    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 210        return Err(Error::http(
 211            StatusCode::INTERNAL_SERVER_ERROR,
 212            "events not enabled".into(),
 213        ))?;
 214    };
 215
 216    if checksum != expected {
 217        return Err(Error::http(
 218            StatusCode::BAD_REQUEST,
 219            "invalid checksum".into(),
 220        ))?;
 221    }
 222
 223    let incident_id = Uuid::new_v4().to_string();
 224
 225    // dump JSON into S3 so we can get frame offsets if we need to.
 226    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 227        blob_store_client
 228            .put_object()
 229            .bucket(CRASH_REPORTS_BUCKET)
 230            .key(incident_id.clone() + ".hang.json")
 231            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 232            .body(ByteStream::from(body.to_vec()))
 233            .send()
 234            .await
 235            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 236            .ok();
 237    }
 238
 239    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
 240        log::error!("can't parse report json: {err}");
 241        Error::Internal(anyhow!(err))
 242    })?;
 243
 244    let mut backtrace = "Possible hang detected on main thread:".to_string();
 245    let unknown = "<unknown>".to_string();
 246    for frame in report.backtrace.iter() {
 247        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
 248    }
 249
 250    tracing::error!(
 251        service = "client",
 252        version = %report.app_version.unwrap_or_default().to_string(),
 253        os_name = %report.os_name,
 254        os_version = report.os_version.unwrap_or_default().to_string(),
 255        incident_id = %incident_id,
 256        installation_id = %report.installation_id.unwrap_or_default(),
 257        backtrace = %backtrace,
 258        "hang report");
 259
 260    Ok(())
 261}
 262
 263pub async fn post_panic(
 264    Extension(app): Extension<Arc<AppState>>,
 265    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 266    body: Bytes,
 267) -> Result<()> {
 268    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 269        return Err(Error::http(
 270            StatusCode::INTERNAL_SERVER_ERROR,
 271            "events not enabled".into(),
 272        ))?;
 273    };
 274
 275    if checksum != expected {
 276        return Err(Error::http(
 277            StatusCode::BAD_REQUEST,
 278            "invalid checksum".into(),
 279        ))?;
 280    }
 281
 282    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
 283        .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
 284    let panic = report.panic;
 285
 286    if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
 287        return Err(Error::http(
 288            StatusCode::BAD_REQUEST,
 289            "invalid os version".into(),
 290        ))?;
 291    }
 292
 293    tracing::error!(
 294        service = "client",
 295        version = %panic.app_version,
 296        os_name = %panic.os_name,
 297        os_version = %panic.os_version.clone().unwrap_or_default(),
 298        installation_id = %panic.installation_id.unwrap_or_default(),
 299        description = %panic.payload,
 300        backtrace = %panic.backtrace.join("\n"),
 301        "panic report");
 302
 303    let backtrace = if panic.backtrace.len() > 25 {
 304        let total = panic.backtrace.len();
 305        format!(
 306            "{}\n   and {} more",
 307            panic
 308                .backtrace
 309                .iter()
 310                .take(20)
 311                .cloned()
 312                .collect::<Vec<_>>()
 313                .join("\n"),
 314            total - 20
 315        )
 316    } else {
 317        panic.backtrace.join("\n")
 318    };
 319    let backtrace_with_summary = panic.payload + "\n" + &backtrace;
 320
 321    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 322        let payload = slack::WebhookBody::new(|w| {
 323            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
 324                .add_section(|s| {
 325                    s.add_field(slack::Text::markdown(format!(
 326                        "*Version:*\n {} ",
 327                        panic.app_version
 328                    )))
 329                    .add_field({
 330                        slack::Text::markdown(format!(
 331                            "*OS:*\n{} {}",
 332                            panic.os_name,
 333                            panic.os_version.unwrap_or_default()
 334                        ))
 335                    })
 336                })
 337                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
 338        });
 339        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 340            log::error!("Failed to serialize payload to JSON: {err}");
 341            Error::Internal(anyhow!(err))
 342        })?;
 343
 344        reqwest::Client::new()
 345            .post(slack_panics_webhook)
 346            .header("Content-Type", "application/json")
 347            .body(payload_json)
 348            .send()
 349            .await
 350            .map_err(|err| {
 351                log::error!("Failed to send payload to Slack: {err}");
 352                Error::Internal(anyhow!(err))
 353            })?;
 354    }
 355
 356    Ok(())
 357}
 358
 359pub async fn post_events(
 360    Extension(app): Extension<Arc<AppState>>,
 361    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 362    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 363    body: Bytes,
 364) -> Result<()> {
 365    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 366        Err(Error::http(
 367            StatusCode::NOT_IMPLEMENTED,
 368            "not supported".into(),
 369        ))?
 370    };
 371
 372    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 373        return Err(Error::http(
 374            StatusCode::INTERNAL_SERVER_ERROR,
 375            "events not enabled".into(),
 376        ))?;
 377    };
 378
 379    let checksum_matched = checksum == expected;
 380
 381    let request_body: telemetry_events::EventRequestBody =
 382        serde_json::from_slice(&body).map_err(|err| {
 383            log::error!("can't parse event json: {err}");
 384            Error::Internal(anyhow!(err))
 385        })?;
 386
 387    let mut to_upload = ToUpload::default();
 388    let Some(last_event) = request_body.events.last() else {
 389        return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
 390    };
 391    let country_code = country_code_header.map(|h| h.to_string());
 392
 393    let first_event_at = chrono::Utc::now()
 394        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 395
 396    for wrapper in &request_body.events {
 397        match &wrapper.event {
 398            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 399                event.clone(),
 400                wrapper,
 401                &request_body,
 402                first_event_at,
 403                country_code.clone(),
 404                checksum_matched,
 405            )),
 406            // Needed for clients sending old copilot_event types
 407            Event::Copilot(_) => {}
 408            Event::InlineCompletion(event) => {
 409                to_upload
 410                    .inline_completion_events
 411                    .push(InlineCompletionEventRow::from_event(
 412                        event.clone(),
 413                        wrapper,
 414                        &request_body,
 415                        first_event_at,
 416                        country_code.clone(),
 417                        checksum_matched,
 418                    ))
 419            }
 420            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 421                event.clone(),
 422                wrapper,
 423                &request_body,
 424                first_event_at,
 425                checksum_matched,
 426            )),
 427            Event::Assistant(event) => {
 428                to_upload
 429                    .assistant_events
 430                    .push(AssistantEventRow::from_event(
 431                        event.clone(),
 432                        wrapper,
 433                        &request_body,
 434                        first_event_at,
 435                        checksum_matched,
 436                    ))
 437            }
 438            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 439                event.clone(),
 440                wrapper,
 441                &request_body,
 442                first_event_at,
 443                checksum_matched,
 444            )),
 445            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 446                event.clone(),
 447                wrapper,
 448                &request_body,
 449                first_event_at,
 450                checksum_matched,
 451            )),
 452            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 453                event.clone(),
 454                wrapper,
 455                &request_body,
 456                first_event_at,
 457                checksum_matched,
 458            )),
 459            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 460                event.clone(),
 461                wrapper,
 462                &request_body,
 463                first_event_at,
 464                checksum_matched,
 465            )),
 466            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 467                event.clone(),
 468                wrapper,
 469                &request_body,
 470                first_event_at,
 471                checksum_matched,
 472            )),
 473            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 474                event.clone(),
 475                wrapper,
 476                &request_body,
 477                first_event_at,
 478                checksum_matched,
 479            )),
 480            Event::Extension(event) => {
 481                let metadata = app
 482                    .db
 483                    .get_extension_version(&event.extension_id, &event.version)
 484                    .await?;
 485                to_upload
 486                    .extension_events
 487                    .push(ExtensionEventRow::from_event(
 488                        event.clone(),
 489                        wrapper,
 490                        &request_body,
 491                        metadata,
 492                        first_event_at,
 493                        checksum_matched,
 494                    ))
 495            }
 496            Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
 497                event.clone(),
 498                wrapper,
 499                &request_body,
 500                first_event_at,
 501                checksum_matched,
 502            )),
 503        }
 504    }
 505
 506    to_upload
 507        .upload(&clickhouse_client)
 508        .await
 509        .map_err(|err| Error::Internal(anyhow!(err)))?;
 510
 511    Ok(())
 512}
 513
 514#[derive(Default)]
 515struct ToUpload {
 516    editor_events: Vec<EditorEventRow>,
 517    inline_completion_events: Vec<InlineCompletionEventRow>,
 518    assistant_events: Vec<AssistantEventRow>,
 519    call_events: Vec<CallEventRow>,
 520    cpu_events: Vec<CpuEventRow>,
 521    memory_events: Vec<MemoryEventRow>,
 522    app_events: Vec<AppEventRow>,
 523    setting_events: Vec<SettingEventRow>,
 524    extension_events: Vec<ExtensionEventRow>,
 525    edit_events: Vec<EditEventRow>,
 526    action_events: Vec<ActionEventRow>,
 527    repl_events: Vec<ReplEventRow>,
 528}
 529
 530impl ToUpload {
 531    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 532        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 533        write_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 534            .await
 535            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 536
 537        const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
 538        write_to_table(
 539            INLINE_COMPLETION_EVENTS_TABLE,
 540            &self.inline_completion_events,
 541            clickhouse_client,
 542        )
 543        .await
 544        .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
 545
 546        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 547        write_to_table(
 548            ASSISTANT_EVENTS_TABLE,
 549            &self.assistant_events,
 550            clickhouse_client,
 551        )
 552        .await
 553        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 554
 555        const CALL_EVENTS_TABLE: &str = "call_events";
 556        write_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 557            .await
 558            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 559
 560        const CPU_EVENTS_TABLE: &str = "cpu_events";
 561        write_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 562            .await
 563            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 564
 565        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 566        write_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 567            .await
 568            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 569
 570        const APP_EVENTS_TABLE: &str = "app_events";
 571        write_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 572            .await
 573            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 574
 575        const SETTING_EVENTS_TABLE: &str = "setting_events";
 576        write_to_table(
 577            SETTING_EVENTS_TABLE,
 578            &self.setting_events,
 579            clickhouse_client,
 580        )
 581        .await
 582        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 583
 584        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 585        write_to_table(
 586            EXTENSION_EVENTS_TABLE,
 587            &self.extension_events,
 588            clickhouse_client,
 589        )
 590        .await
 591        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 592
 593        const EDIT_EVENTS_TABLE: &str = "edit_events";
 594        write_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 595            .await
 596            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 597
 598        const ACTION_EVENTS_TABLE: &str = "action_events";
 599        write_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 600            .await
 601            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 602
 603        const REPL_EVENTS_TABLE: &str = "repl_events";
 604        write_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
 605            .await
 606            .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
 607
 608        Ok(())
 609    }
 610}
 611
 612pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 613where
 614    S: Serializer,
 615{
 616    if country_code.len() != 2 {
 617        use serde::ser::Error;
 618        return Err(S::Error::custom(
 619            "country_code must be exactly 2 characters",
 620        ));
 621    }
 622
 623    let country_code = country_code.as_bytes();
 624
 625    serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
 626}
 627
 628#[derive(Serialize, Debug, clickhouse::Row)]
 629pub struct EditorEventRow {
 630    installation_id: String,
 631    metrics_id: String,
 632    operation: String,
 633    app_version: String,
 634    file_extension: String,
 635    os_name: String,
 636    os_version: String,
 637    release_channel: String,
 638    signed_in: bool,
 639    vim_mode: bool,
 640    #[serde(serialize_with = "serialize_country_code")]
 641    country_code: String,
 642    region_code: String,
 643    city: String,
 644    time: i64,
 645    copilot_enabled: bool,
 646    copilot_enabled_for_language: bool,
 647    historical_event: bool,
 648    architecture: String,
 649    is_staff: Option<bool>,
 650    session_id: Option<String>,
 651    major: Option<i32>,
 652    minor: Option<i32>,
 653    patch: Option<i32>,
 654    checksum_matched: bool,
 655}
 656
 657impl EditorEventRow {
 658    fn from_event(
 659        event: EditorEvent,
 660        wrapper: &EventWrapper,
 661        body: &EventRequestBody,
 662        first_event_at: chrono::DateTime<chrono::Utc>,
 663        country_code: Option<String>,
 664        checksum_matched: bool,
 665    ) -> Self {
 666        let semver = body.semver();
 667        let time =
 668            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 669
 670        Self {
 671            app_version: body.app_version.clone(),
 672            major: semver.map(|v| v.major() as i32),
 673            minor: semver.map(|v| v.minor() as i32),
 674            patch: semver.map(|v| v.patch() as i32),
 675            checksum_matched,
 676            release_channel: body.release_channel.clone().unwrap_or_default(),
 677            os_name: body.os_name.clone(),
 678            os_version: body.os_version.clone().unwrap_or_default(),
 679            architecture: body.architecture.clone(),
 680            installation_id: body.installation_id.clone().unwrap_or_default(),
 681            metrics_id: body.metrics_id.clone().unwrap_or_default(),
 682            session_id: body.session_id.clone(),
 683            is_staff: body.is_staff,
 684            time: time.timestamp_millis(),
 685            operation: event.operation,
 686            file_extension: event.file_extension.unwrap_or_default(),
 687            signed_in: wrapper.signed_in,
 688            vim_mode: event.vim_mode,
 689            copilot_enabled: event.copilot_enabled,
 690            copilot_enabled_for_language: event.copilot_enabled_for_language,
 691            country_code: country_code.unwrap_or("XX".to_string()),
 692            region_code: "".to_string(),
 693            city: "".to_string(),
 694            historical_event: false,
 695        }
 696    }
 697}
 698
 699#[derive(Serialize, Debug, clickhouse::Row)]
 700pub struct InlineCompletionEventRow {
 701    installation_id: String,
 702    provider: String,
 703    suggestion_accepted: bool,
 704    app_version: String,
 705    file_extension: String,
 706    os_name: String,
 707    os_version: String,
 708    release_channel: String,
 709    signed_in: bool,
 710    #[serde(serialize_with = "serialize_country_code")]
 711    country_code: String,
 712    region_code: String,
 713    city: String,
 714    time: i64,
 715    is_staff: Option<bool>,
 716    session_id: Option<String>,
 717    major: Option<i32>,
 718    minor: Option<i32>,
 719    patch: Option<i32>,
 720    checksum_matched: bool,
 721}
 722
 723impl InlineCompletionEventRow {
 724    fn from_event(
 725        event: InlineCompletionEvent,
 726        wrapper: &EventWrapper,
 727        body: &EventRequestBody,
 728        first_event_at: chrono::DateTime<chrono::Utc>,
 729        country_code: Option<String>,
 730        checksum_matched: bool,
 731    ) -> Self {
 732        let semver = body.semver();
 733        let time =
 734            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 735
 736        Self {
 737            app_version: body.app_version.clone(),
 738            major: semver.map(|v| v.major() as i32),
 739            minor: semver.map(|v| v.minor() as i32),
 740            patch: semver.map(|v| v.patch() as i32),
 741            checksum_matched,
 742            release_channel: body.release_channel.clone().unwrap_or_default(),
 743            os_name: body.os_name.clone(),
 744            os_version: body.os_version.clone().unwrap_or_default(),
 745            installation_id: body.installation_id.clone().unwrap_or_default(),
 746            session_id: body.session_id.clone(),
 747            is_staff: body.is_staff,
 748            time: time.timestamp_millis(),
 749            file_extension: event.file_extension.unwrap_or_default(),
 750            signed_in: wrapper.signed_in,
 751            country_code: country_code.unwrap_or("XX".to_string()),
 752            region_code: "".to_string(),
 753            city: "".to_string(),
 754            provider: event.provider,
 755            suggestion_accepted: event.suggestion_accepted,
 756        }
 757    }
 758}
 759
 760#[derive(Serialize, Debug, clickhouse::Row)]
 761pub struct CallEventRow {
 762    // AppInfoBase
 763    app_version: String,
 764    major: Option<i32>,
 765    minor: Option<i32>,
 766    patch: Option<i32>,
 767    release_channel: String,
 768    os_name: String,
 769    os_version: String,
 770    checksum_matched: bool,
 771
 772    // ClientEventBase
 773    installation_id: String,
 774    session_id: Option<String>,
 775    is_staff: Option<bool>,
 776    time: i64,
 777
 778    // CallEventRow
 779    operation: String,
 780    room_id: Option<u64>,
 781    channel_id: Option<u64>,
 782}
 783
 784impl CallEventRow {
 785    fn from_event(
 786        event: CallEvent,
 787        wrapper: &EventWrapper,
 788        body: &EventRequestBody,
 789        first_event_at: chrono::DateTime<chrono::Utc>,
 790        checksum_matched: bool,
 791    ) -> Self {
 792        let semver = body.semver();
 793        let time =
 794            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 795
 796        Self {
 797            app_version: body.app_version.clone(),
 798            major: semver.map(|v| v.major() as i32),
 799            minor: semver.map(|v| v.minor() as i32),
 800            patch: semver.map(|v| v.patch() as i32),
 801            checksum_matched,
 802            release_channel: body.release_channel.clone().unwrap_or_default(),
 803            os_name: body.os_name.clone(),
 804            os_version: body.os_version.clone().unwrap_or_default(),
 805            installation_id: body.installation_id.clone().unwrap_or_default(),
 806            session_id: body.session_id.clone(),
 807            is_staff: body.is_staff,
 808            time: time.timestamp_millis(),
 809            operation: event.operation,
 810            room_id: event.room_id,
 811            channel_id: event.channel_id,
 812        }
 813    }
 814}
 815
 816#[derive(Serialize, Debug, clickhouse::Row)]
 817pub struct AssistantEventRow {
 818    // AppInfoBase
 819    app_version: String,
 820    major: Option<i32>,
 821    minor: Option<i32>,
 822    patch: Option<i32>,
 823    checksum_matched: bool,
 824    release_channel: String,
 825    os_name: String,
 826    os_version: String,
 827
 828    // ClientEventBase
 829    installation_id: Option<String>,
 830    session_id: Option<String>,
 831    is_staff: Option<bool>,
 832    time: i64,
 833
 834    // AssistantEventRow
 835    conversation_id: String,
 836    kind: String,
 837    phase: String,
 838    model: String,
 839    response_latency_in_ms: Option<i64>,
 840    error_message: Option<String>,
 841}
 842
 843impl AssistantEventRow {
 844    fn from_event(
 845        event: AssistantEvent,
 846        wrapper: &EventWrapper,
 847        body: &EventRequestBody,
 848        first_event_at: chrono::DateTime<chrono::Utc>,
 849        checksum_matched: bool,
 850    ) -> Self {
 851        let semver = body.semver();
 852        let time =
 853            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 854
 855        Self {
 856            app_version: body.app_version.clone(),
 857            major: semver.map(|v| v.major() as i32),
 858            minor: semver.map(|v| v.minor() as i32),
 859            patch: semver.map(|v| v.patch() as i32),
 860            checksum_matched,
 861            release_channel: body.release_channel.clone().unwrap_or_default(),
 862            os_name: body.os_name.clone(),
 863            os_version: body.os_version.clone().unwrap_or_default(),
 864            installation_id: body.installation_id.clone(),
 865            session_id: body.session_id.clone(),
 866            is_staff: body.is_staff,
 867            time: time.timestamp_millis(),
 868            conversation_id: event.conversation_id.unwrap_or_default(),
 869            kind: event.kind.to_string(),
 870            phase: event.phase.to_string(),
 871            model: event.model,
 872            response_latency_in_ms: event
 873                .response_latency
 874                .map(|latency| latency.as_millis() as i64),
 875            error_message: event.error_message,
 876        }
 877    }
 878}
 879
 880#[derive(Debug, clickhouse::Row, Serialize)]
 881pub struct CpuEventRow {
 882    installation_id: Option<String>,
 883    is_staff: Option<bool>,
 884    usage_as_percentage: f32,
 885    core_count: u32,
 886    app_version: String,
 887    release_channel: String,
 888    os_name: String,
 889    os_version: String,
 890    time: i64,
 891    session_id: Option<String>,
 892    // pub normalized_cpu_usage: f64, MATERIALIZED
 893    major: Option<i32>,
 894    minor: Option<i32>,
 895    patch: Option<i32>,
 896    checksum_matched: bool,
 897}
 898
 899impl CpuEventRow {
 900    fn from_event(
 901        event: CpuEvent,
 902        wrapper: &EventWrapper,
 903        body: &EventRequestBody,
 904        first_event_at: chrono::DateTime<chrono::Utc>,
 905        checksum_matched: bool,
 906    ) -> Self {
 907        let semver = body.semver();
 908        let time =
 909            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 910
 911        Self {
 912            app_version: body.app_version.clone(),
 913            major: semver.map(|v| v.major() as i32),
 914            minor: semver.map(|v| v.minor() as i32),
 915            patch: semver.map(|v| v.patch() as i32),
 916            checksum_matched,
 917            release_channel: body.release_channel.clone().unwrap_or_default(),
 918            os_name: body.os_name.clone(),
 919            os_version: body.os_version.clone().unwrap_or_default(),
 920            installation_id: body.installation_id.clone(),
 921            session_id: body.session_id.clone(),
 922            is_staff: body.is_staff,
 923            time: time.timestamp_millis(),
 924            usage_as_percentage: event.usage_as_percentage,
 925            core_count: event.core_count,
 926        }
 927    }
 928}
 929
 930#[derive(Serialize, Debug, clickhouse::Row)]
 931pub struct MemoryEventRow {
 932    // AppInfoBase
 933    app_version: String,
 934    major: Option<i32>,
 935    minor: Option<i32>,
 936    patch: Option<i32>,
 937    checksum_matched: bool,
 938    release_channel: String,
 939    os_name: String,
 940    os_version: String,
 941
 942    // ClientEventBase
 943    installation_id: Option<String>,
 944    session_id: Option<String>,
 945    is_staff: Option<bool>,
 946    time: i64,
 947
 948    // MemoryEventRow
 949    memory_in_bytes: u64,
 950    virtual_memory_in_bytes: u64,
 951}
 952
 953impl MemoryEventRow {
 954    fn from_event(
 955        event: MemoryEvent,
 956        wrapper: &EventWrapper,
 957        body: &EventRequestBody,
 958        first_event_at: chrono::DateTime<chrono::Utc>,
 959        checksum_matched: bool,
 960    ) -> Self {
 961        let semver = body.semver();
 962        let time =
 963            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 964
 965        Self {
 966            app_version: body.app_version.clone(),
 967            major: semver.map(|v| v.major() as i32),
 968            minor: semver.map(|v| v.minor() as i32),
 969            patch: semver.map(|v| v.patch() as i32),
 970            checksum_matched,
 971            release_channel: body.release_channel.clone().unwrap_or_default(),
 972            os_name: body.os_name.clone(),
 973            os_version: body.os_version.clone().unwrap_or_default(),
 974            installation_id: body.installation_id.clone(),
 975            session_id: body.session_id.clone(),
 976            is_staff: body.is_staff,
 977            time: time.timestamp_millis(),
 978            memory_in_bytes: event.memory_in_bytes,
 979            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
 980        }
 981    }
 982}
 983
 984#[derive(Serialize, Debug, clickhouse::Row)]
 985pub struct AppEventRow {
 986    // AppInfoBase
 987    app_version: String,
 988    major: Option<i32>,
 989    minor: Option<i32>,
 990    patch: Option<i32>,
 991    checksum_matched: bool,
 992    release_channel: String,
 993    os_name: String,
 994    os_version: String,
 995
 996    // ClientEventBase
 997    installation_id: Option<String>,
 998    session_id: Option<String>,
 999    is_staff: Option<bool>,
1000    time: i64,
1001
1002    // AppEventRow
1003    operation: String,
1004}
1005
1006impl AppEventRow {
1007    fn from_event(
1008        event: AppEvent,
1009        wrapper: &EventWrapper,
1010        body: &EventRequestBody,
1011        first_event_at: chrono::DateTime<chrono::Utc>,
1012        checksum_matched: bool,
1013    ) -> Self {
1014        let semver = body.semver();
1015        let time =
1016            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1017
1018        Self {
1019            app_version: body.app_version.clone(),
1020            major: semver.map(|v| v.major() as i32),
1021            minor: semver.map(|v| v.minor() as i32),
1022            patch: semver.map(|v| v.patch() as i32),
1023            checksum_matched,
1024            release_channel: body.release_channel.clone().unwrap_or_default(),
1025            os_name: body.os_name.clone(),
1026            os_version: body.os_version.clone().unwrap_or_default(),
1027            installation_id: body.installation_id.clone(),
1028            session_id: body.session_id.clone(),
1029            is_staff: body.is_staff,
1030            time: time.timestamp_millis(),
1031            operation: event.operation,
1032        }
1033    }
1034}
1035
1036#[derive(Serialize, Debug, clickhouse::Row)]
1037pub struct SettingEventRow {
1038    // AppInfoBase
1039    app_version: String,
1040    major: Option<i32>,
1041    minor: Option<i32>,
1042    patch: Option<i32>,
1043    checksum_matched: bool,
1044    release_channel: String,
1045    os_name: String,
1046    os_version: String,
1047
1048    // ClientEventBase
1049    installation_id: Option<String>,
1050    session_id: Option<String>,
1051    is_staff: Option<bool>,
1052    time: i64,
1053    // SettingEventRow
1054    setting: String,
1055    value: String,
1056}
1057
1058impl SettingEventRow {
1059    fn from_event(
1060        event: SettingEvent,
1061        wrapper: &EventWrapper,
1062        body: &EventRequestBody,
1063        first_event_at: chrono::DateTime<chrono::Utc>,
1064        checksum_matched: bool,
1065    ) -> Self {
1066        let semver = body.semver();
1067        let time =
1068            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1069
1070        Self {
1071            app_version: body.app_version.clone(),
1072            major: semver.map(|v| v.major() as i32),
1073            minor: semver.map(|v| v.minor() as i32),
1074            checksum_matched,
1075            patch: semver.map(|v| v.patch() as i32),
1076            release_channel: body.release_channel.clone().unwrap_or_default(),
1077            os_name: body.os_name.clone(),
1078            os_version: body.os_version.clone().unwrap_or_default(),
1079            installation_id: body.installation_id.clone(),
1080            session_id: body.session_id.clone(),
1081            is_staff: body.is_staff,
1082            time: time.timestamp_millis(),
1083            setting: event.setting,
1084            value: event.value,
1085        }
1086    }
1087}
1088
1089#[derive(Serialize, Debug, clickhouse::Row)]
1090pub struct ExtensionEventRow {
1091    // AppInfoBase
1092    app_version: String,
1093    major: Option<i32>,
1094    minor: Option<i32>,
1095    patch: Option<i32>,
1096    checksum_matched: bool,
1097    release_channel: String,
1098    os_name: String,
1099    os_version: String,
1100
1101    // ClientEventBase
1102    installation_id: Option<String>,
1103    session_id: Option<String>,
1104    is_staff: Option<bool>,
1105    time: i64,
1106
1107    // ExtensionEventRow
1108    extension_id: Arc<str>,
1109    extension_version: Arc<str>,
1110    dev: bool,
1111    schema_version: Option<i32>,
1112    wasm_api_version: Option<String>,
1113}
1114
1115impl ExtensionEventRow {
1116    fn from_event(
1117        event: ExtensionEvent,
1118        wrapper: &EventWrapper,
1119        body: &EventRequestBody,
1120        extension_metadata: Option<ExtensionMetadata>,
1121        first_event_at: chrono::DateTime<chrono::Utc>,
1122        checksum_matched: bool,
1123    ) -> Self {
1124        let semver = body.semver();
1125        let time =
1126            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1127
1128        Self {
1129            app_version: body.app_version.clone(),
1130            major: semver.map(|v| v.major() as i32),
1131            minor: semver.map(|v| v.minor() as i32),
1132            patch: semver.map(|v| v.patch() as i32),
1133            checksum_matched,
1134            release_channel: body.release_channel.clone().unwrap_or_default(),
1135            os_name: body.os_name.clone(),
1136            os_version: body.os_version.clone().unwrap_or_default(),
1137            installation_id: body.installation_id.clone(),
1138            session_id: body.session_id.clone(),
1139            is_staff: body.is_staff,
1140            time: time.timestamp_millis(),
1141            extension_id: event.extension_id,
1142            extension_version: event.version,
1143            dev: extension_metadata.is_none(),
1144            schema_version: extension_metadata
1145                .as_ref()
1146                .and_then(|metadata| metadata.manifest.schema_version),
1147            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1148                metadata
1149                    .manifest
1150                    .wasm_api_version
1151                    .as_ref()
1152                    .map(|version| version.to_string())
1153            }),
1154        }
1155    }
1156}
1157
1158#[derive(Serialize, Debug, clickhouse::Row)]
1159pub struct ReplEventRow {
1160    // AppInfoBase
1161    app_version: String,
1162    major: Option<i32>,
1163    minor: Option<i32>,
1164    patch: Option<i32>,
1165    checksum_matched: bool,
1166    release_channel: String,
1167    os_name: String,
1168    os_version: String,
1169
1170    // ClientEventBase
1171    installation_id: Option<String>,
1172    session_id: Option<String>,
1173    is_staff: Option<bool>,
1174    time: i64,
1175
1176    // ReplEventRow
1177    kernel_language: String,
1178    kernel_status: String,
1179    repl_session_id: String,
1180}
1181
1182impl ReplEventRow {
1183    fn from_event(
1184        event: ReplEvent,
1185        wrapper: &EventWrapper,
1186        body: &EventRequestBody,
1187        first_event_at: chrono::DateTime<chrono::Utc>,
1188        checksum_matched: bool,
1189    ) -> Self {
1190        let semver = body.semver();
1191        let time =
1192            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1193
1194        Self {
1195            app_version: body.app_version.clone(),
1196            major: semver.map(|v| v.major() as i32),
1197            minor: semver.map(|v| v.minor() as i32),
1198            patch: semver.map(|v| v.patch() as i32),
1199            checksum_matched,
1200            release_channel: body.release_channel.clone().unwrap_or_default(),
1201            os_name: body.os_name.clone(),
1202            os_version: body.os_version.clone().unwrap_or_default(),
1203            installation_id: body.installation_id.clone(),
1204            session_id: body.session_id.clone(),
1205            is_staff: body.is_staff,
1206            time: time.timestamp_millis(),
1207            kernel_language: event.kernel_language,
1208            kernel_status: event.kernel_status,
1209            repl_session_id: event.repl_session_id,
1210        }
1211    }
1212}
1213
1214#[derive(Serialize, Debug, clickhouse::Row)]
1215pub struct EditEventRow {
1216    // AppInfoBase
1217    app_version: String,
1218    major: Option<i32>,
1219    minor: Option<i32>,
1220    patch: Option<i32>,
1221    checksum_matched: bool,
1222    release_channel: String,
1223    os_name: String,
1224    os_version: String,
1225
1226    // ClientEventBase
1227    installation_id: Option<String>,
1228    // Note: This column name has a typo in the ClickHouse table.
1229    #[serde(rename = "sesssion_id")]
1230    session_id: Option<String>,
1231    is_staff: Option<bool>,
1232    time: i64,
1233
1234    // EditEventRow
1235    period_start: i64,
1236    period_end: i64,
1237    environment: String,
1238}
1239
1240impl EditEventRow {
1241    fn from_event(
1242        event: EditEvent,
1243        wrapper: &EventWrapper,
1244        body: &EventRequestBody,
1245        first_event_at: chrono::DateTime<chrono::Utc>,
1246        checksum_matched: bool,
1247    ) -> Self {
1248        let semver = body.semver();
1249        let time =
1250            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1251
1252        let period_start = time - chrono::Duration::milliseconds(event.duration);
1253        let period_end = time;
1254
1255        Self {
1256            app_version: body.app_version.clone(),
1257            major: semver.map(|v| v.major() as i32),
1258            minor: semver.map(|v| v.minor() as i32),
1259            patch: semver.map(|v| v.patch() as i32),
1260            checksum_matched,
1261            release_channel: body.release_channel.clone().unwrap_or_default(),
1262            os_name: body.os_name.clone(),
1263            os_version: body.os_version.clone().unwrap_or_default(),
1264            installation_id: body.installation_id.clone(),
1265            session_id: body.session_id.clone(),
1266            is_staff: body.is_staff,
1267            time: time.timestamp_millis(),
1268            period_start: period_start.timestamp_millis(),
1269            period_end: period_end.timestamp_millis(),
1270            environment: event.environment,
1271        }
1272    }
1273}
1274
1275#[derive(Serialize, Debug, clickhouse::Row)]
1276pub struct ActionEventRow {
1277    // AppInfoBase
1278    app_version: String,
1279    major: Option<i32>,
1280    minor: Option<i32>,
1281    patch: Option<i32>,
1282    checksum_matched: bool,
1283    release_channel: String,
1284    os_name: String,
1285    os_version: String,
1286
1287    // ClientEventBase
1288    installation_id: Option<String>,
1289    // Note: This column name has a typo in the ClickHouse table.
1290    #[serde(rename = "sesssion_id")]
1291    session_id: Option<String>,
1292    is_staff: Option<bool>,
1293    time: i64,
1294    // ActionEventRow
1295    source: String,
1296    action: String,
1297}
1298
1299impl ActionEventRow {
1300    fn from_event(
1301        event: ActionEvent,
1302        wrapper: &EventWrapper,
1303        body: &EventRequestBody,
1304        first_event_at: chrono::DateTime<chrono::Utc>,
1305        checksum_matched: bool,
1306    ) -> Self {
1307        let semver = body.semver();
1308        let time =
1309            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1310
1311        Self {
1312            app_version: body.app_version.clone(),
1313            major: semver.map(|v| v.major() as i32),
1314            minor: semver.map(|v| v.minor() as i32),
1315            patch: semver.map(|v| v.patch() as i32),
1316            checksum_matched,
1317            release_channel: body.release_channel.clone().unwrap_or_default(),
1318            os_name: body.os_name.clone(),
1319            os_version: body.os_version.clone().unwrap_or_default(),
1320            installation_id: body.installation_id.clone(),
1321            session_id: body.session_id.clone(),
1322            is_staff: body.is_staff,
1323            time: time.timestamp_millis(),
1324            source: event.source,
1325            action: event.action,
1326        }
1327    }
1328}
1329
1330pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1331    let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
1332
1333    let mut summer = Sha256::new();
1334    summer.update(checksum_seed);
1335    summer.update(json);
1336    summer.update(checksum_seed);
1337    Some(summer.finalize().into_iter().collect())
1338}