events.rs

   1use super::ips_file::IpsFile;
   2use crate::api::CloudflareIpCountryHeader;
   3use crate::clickhouse::write_to_table;
   4use crate::{api::slack, AppState, Error, Result};
   5use anyhow::{anyhow, Context};
   6use aws_sdk_s3::primitives::ByteStream;
   7use axum::{
   8    body::Bytes,
   9    headers::Header,
  10    http::{HeaderMap, HeaderName, StatusCode},
  11    routing::post,
  12    Extension, Router, TypedHeader,
  13};
  14use rpc::ExtensionMetadata;
  15use semantic_version::SemanticVersion;
  16use serde::{Serialize, Serializer};
  17use sha2::{Digest, Sha256};
  18use std::sync::{Arc, OnceLock};
  19use telemetry_events::{
  20    ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
  21    EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, ReplEvent,
  22    SettingEvent,
  23};
  24use uuid::Uuid;
  25
  26static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  27
  28pub fn router() -> Router {
  29    Router::new()
  30        .route("/telemetry/events", post(post_events))
  31        .route("/telemetry/crashes", post(post_crash))
  32        .route("/telemetry/panics", post(post_panic))
  33        .route("/telemetry/hangs", post(post_hang))
  34}
  35
  36pub struct ZedChecksumHeader(Vec<u8>);
  37
  38impl Header for ZedChecksumHeader {
  39    fn name() -> &'static HeaderName {
  40        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  41        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  42    }
  43
  44    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  45    where
  46        Self: Sized,
  47        I: Iterator<Item = &'i axum::http::HeaderValue>,
  48    {
  49        let checksum = values
  50            .next()
  51            .ok_or_else(axum::headers::Error::invalid)?
  52            .to_str()
  53            .map_err(|_| axum::headers::Error::invalid())?;
  54
  55        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  56        Ok(Self(bytes))
  57    }
  58
  59    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  60        unimplemented!()
  61    }
  62}
  63
  64pub async fn post_crash(
  65    Extension(app): Extension<Arc<AppState>>,
  66    headers: HeaderMap,
  67    body: Bytes,
  68) -> Result<()> {
  69    let report = IpsFile::parse(&body)?;
  70    let version_threshold = SemanticVersion::new(0, 123, 0);
  71
  72    let bundle_id = &report.header.bundle_id;
  73    let app_version = &report.app_version();
  74
  75    if bundle_id == "dev.zed.Zed-Dev" {
  76        log::error!("Crash uploads from {} are ignored.", bundle_id);
  77        return Ok(());
  78    }
  79
  80    if app_version.is_none() || app_version.unwrap() < version_threshold {
  81        log::error!(
  82            "Crash uploads from {} are ignored.",
  83            report.header.app_version
  84        );
  85        return Ok(());
  86    }
  87    let app_version = app_version.unwrap();
  88
  89    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
  90        let response = blob_store_client
  91            .head_object()
  92            .bucket(CRASH_REPORTS_BUCKET)
  93            .key(report.header.incident_id.clone() + ".ips")
  94            .send()
  95            .await;
  96
  97        if response.is_ok() {
  98            log::info!("We've already uploaded this crash");
  99            return Ok(());
 100        }
 101
 102        blob_store_client
 103            .put_object()
 104            .bucket(CRASH_REPORTS_BUCKET)
 105            .key(report.header.incident_id.clone() + ".ips")
 106            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 107            .body(ByteStream::from(body.to_vec()))
 108            .send()
 109            .await
 110            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 111            .ok();
 112    }
 113
 114    let recent_panic_on: Option<i64> = headers
 115        .get("x-zed-panicked-on")
 116        .and_then(|h| h.to_str().ok())
 117        .and_then(|s| s.parse().ok());
 118
 119    let installation_id = headers
 120        .get("x-zed-installation-id")
 121        .and_then(|h| h.to_str().ok())
 122        .map(|s| s.to_string())
 123        .unwrap_or_default();
 124
 125    let mut recent_panic = None;
 126
 127    if let Some(recent_panic_on) = recent_panic_on {
 128        let crashed_at = match report.timestamp() {
 129            Ok(t) => Some(t),
 130            Err(e) => {
 131                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 132                None
 133            }
 134        };
 135        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 136            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 137        }
 138    }
 139
 140    let description = report.description(recent_panic);
 141    let summary = report.backtrace_summary();
 142
 143    tracing::error!(
 144        service = "client",
 145        version = %report.header.app_version,
 146        os_version = %report.header.os_version,
 147        bundle_id = %report.header.bundle_id,
 148        incident_id = %report.header.incident_id,
 149        installation_id = %installation_id,
 150        description = %description,
 151        backtrace = %summary,
 152        "crash report");
 153
 154    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 155        let payload = slack::WebhookBody::new(|w| {
 156            w.add_section(|s| s.text(slack::Text::markdown(description)))
 157                .add_section(|s| {
 158                    s.add_field(slack::Text::markdown(format!(
 159                        "*Version:*\n{} ({})",
 160                        bundle_id, app_version
 161                    )))
 162                    .add_field({
 163                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 164                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 165                            hostname.strip_prefix("http://").unwrap_or_default()
 166                        });
 167
 168                        slack::Text::markdown(format!(
 169                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 170                            CRASH_REPORTS_BUCKET,
 171                            hostname,
 172                            report.header.incident_id,
 173                            report
 174                                .header
 175                                .incident_id
 176                                .chars()
 177                                .take(8)
 178                                .collect::<String>(),
 179                        ))
 180                    })
 181                })
 182                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 183        });
 184        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 185            log::error!("Failed to serialize payload to JSON: {err}");
 186            Error::Internal(anyhow!(err))
 187        })?;
 188
 189        reqwest::Client::new()
 190            .post(slack_panics_webhook)
 191            .header("Content-Type", "application/json")
 192            .body(payload_json)
 193            .send()
 194            .await
 195            .map_err(|err| {
 196                log::error!("Failed to send payload to Slack: {err}");
 197                Error::Internal(anyhow!(err))
 198            })?;
 199    }
 200
 201    Ok(())
 202}
 203
 204pub async fn post_hang(
 205    Extension(app): Extension<Arc<AppState>>,
 206    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 207    body: Bytes,
 208) -> Result<()> {
 209    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 210        return Err(Error::http(
 211            StatusCode::INTERNAL_SERVER_ERROR,
 212            "events not enabled".into(),
 213        ))?;
 214    };
 215
 216    if checksum != expected {
 217        return Err(Error::http(
 218            StatusCode::BAD_REQUEST,
 219            "invalid checksum".into(),
 220        ))?;
 221    }
 222
 223    let incident_id = Uuid::new_v4().to_string();
 224
 225    // dump JSON into S3 so we can get frame offsets if we need to.
 226    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 227        blob_store_client
 228            .put_object()
 229            .bucket(CRASH_REPORTS_BUCKET)
 230            .key(incident_id.clone() + ".hang.json")
 231            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 232            .body(ByteStream::from(body.to_vec()))
 233            .send()
 234            .await
 235            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 236            .ok();
 237    }
 238
 239    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
 240        log::error!("can't parse report json: {err}");
 241        Error::Internal(anyhow!(err))
 242    })?;
 243
 244    let mut backtrace = "Possible hang detected on main thread:".to_string();
 245    let unknown = "<unknown>".to_string();
 246    for frame in report.backtrace.iter() {
 247        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
 248    }
 249
 250    tracing::error!(
 251        service = "client",
 252        version = %report.app_version.unwrap_or_default().to_string(),
 253        os_name = %report.os_name,
 254        os_version = report.os_version.unwrap_or_default().to_string(),
 255        incident_id = %incident_id,
 256        installation_id = %report.installation_id.unwrap_or_default(),
 257        backtrace = %backtrace,
 258        "hang report");
 259
 260    Ok(())
 261}
 262
 263pub async fn post_panic(
 264    Extension(app): Extension<Arc<AppState>>,
 265    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 266    body: Bytes,
 267) -> Result<()> {
 268    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 269        return Err(Error::http(
 270            StatusCode::INTERNAL_SERVER_ERROR,
 271            "events not enabled".into(),
 272        ))?;
 273    };
 274
 275    if checksum != expected {
 276        return Err(Error::http(
 277            StatusCode::BAD_REQUEST,
 278            "invalid checksum".into(),
 279        ))?;
 280    }
 281
 282    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
 283        .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
 284    let panic = report.panic;
 285
 286    if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
 287        return Err(Error::http(
 288            StatusCode::BAD_REQUEST,
 289            "invalid os version".into(),
 290        ))?;
 291    }
 292
 293    tracing::error!(
 294        service = "client",
 295        version = %panic.app_version,
 296        os_name = %panic.os_name,
 297        os_version = %panic.os_version.clone().unwrap_or_default(),
 298        installation_id = %panic.installation_id.unwrap_or_default(),
 299        description = %panic.payload,
 300        backtrace = %panic.backtrace.join("\n"),
 301        "panic report");
 302
 303    let backtrace = if panic.backtrace.len() > 25 {
 304        let total = panic.backtrace.len();
 305        format!(
 306            "{}\n   and {} more",
 307            panic
 308                .backtrace
 309                .iter()
 310                .take(20)
 311                .cloned()
 312                .collect::<Vec<_>>()
 313                .join("\n"),
 314            total - 20
 315        )
 316    } else {
 317        panic.backtrace.join("\n")
 318    };
 319    let backtrace_with_summary = panic.payload + "\n" + &backtrace;
 320
 321    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 322        let payload = slack::WebhookBody::new(|w| {
 323            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
 324                .add_section(|s| {
 325                    s.add_field(slack::Text::markdown(format!(
 326                        "*Version:*\n {} ",
 327                        panic.app_version
 328                    )))
 329                    .add_field({
 330                        slack::Text::markdown(format!(
 331                            "*OS:*\n{} {}",
 332                            panic.os_name,
 333                            panic.os_version.unwrap_or_default()
 334                        ))
 335                    })
 336                })
 337                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
 338        });
 339        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 340            log::error!("Failed to serialize payload to JSON: {err}");
 341            Error::Internal(anyhow!(err))
 342        })?;
 343
 344        reqwest::Client::new()
 345            .post(slack_panics_webhook)
 346            .header("Content-Type", "application/json")
 347            .body(payload_json)
 348            .send()
 349            .await
 350            .map_err(|err| {
 351                log::error!("Failed to send payload to Slack: {err}");
 352                Error::Internal(anyhow!(err))
 353            })?;
 354    }
 355
 356    Ok(())
 357}
 358
 359pub async fn post_events(
 360    Extension(app): Extension<Arc<AppState>>,
 361    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 362    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 363    body: Bytes,
 364) -> Result<()> {
 365    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 366        Err(Error::http(
 367            StatusCode::NOT_IMPLEMENTED,
 368            "not supported".into(),
 369        ))?
 370    };
 371
 372    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 373        return Err(Error::http(
 374            StatusCode::INTERNAL_SERVER_ERROR,
 375            "events not enabled".into(),
 376        ))?;
 377    };
 378
 379    let checksum_matched = checksum == expected;
 380
 381    let request_body: telemetry_events::EventRequestBody =
 382        serde_json::from_slice(&body).map_err(|err| {
 383            log::error!("can't parse event json: {err}");
 384            Error::Internal(anyhow!(err))
 385        })?;
 386
 387    let mut to_upload = ToUpload::default();
 388    let Some(last_event) = request_body.events.last() else {
 389        return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
 390    };
 391    let country_code = country_code_header.map(|h| h.to_string());
 392
 393    let first_event_at = chrono::Utc::now()
 394        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 395
 396    for wrapper in &request_body.events {
 397        match &wrapper.event {
 398            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 399                event.clone(),
 400                wrapper,
 401                &request_body,
 402                first_event_at,
 403                country_code.clone(),
 404                checksum_matched,
 405            )),
 406            // Needed for clients sending old copilot_event types
 407            Event::Copilot(_) => {}
 408            Event::InlineCompletion(event) => {
 409                to_upload
 410                    .inline_completion_events
 411                    .push(InlineCompletionEventRow::from_event(
 412                        event.clone(),
 413                        wrapper,
 414                        &request_body,
 415                        first_event_at,
 416                        country_code.clone(),
 417                        checksum_matched,
 418                    ))
 419            }
 420            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 421                event.clone(),
 422                wrapper,
 423                &request_body,
 424                first_event_at,
 425                checksum_matched,
 426            )),
 427            Event::Assistant(event) => {
 428                to_upload
 429                    .assistant_events
 430                    .push(AssistantEventRow::from_event(
 431                        event.clone(),
 432                        wrapper,
 433                        &request_body,
 434                        first_event_at,
 435                        checksum_matched,
 436                    ))
 437            }
 438            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 439                event.clone(),
 440                wrapper,
 441                &request_body,
 442                first_event_at,
 443                checksum_matched,
 444            )),
 445            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 446                event.clone(),
 447                wrapper,
 448                &request_body,
 449                first_event_at,
 450                checksum_matched,
 451            )),
 452            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 453                event.clone(),
 454                wrapper,
 455                &request_body,
 456                first_event_at,
 457                checksum_matched,
 458            )),
 459            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 460                event.clone(),
 461                wrapper,
 462                &request_body,
 463                first_event_at,
 464                checksum_matched,
 465            )),
 466            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 467                event.clone(),
 468                wrapper,
 469                &request_body,
 470                first_event_at,
 471                checksum_matched,
 472            )),
 473            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 474                event.clone(),
 475                wrapper,
 476                &request_body,
 477                first_event_at,
 478                checksum_matched,
 479            )),
 480            Event::Extension(event) => {
 481                let metadata = app
 482                    .db
 483                    .get_extension_version(&event.extension_id, &event.version)
 484                    .await?;
 485                to_upload
 486                    .extension_events
 487                    .push(ExtensionEventRow::from_event(
 488                        event.clone(),
 489                        wrapper,
 490                        &request_body,
 491                        metadata,
 492                        first_event_at,
 493                        checksum_matched,
 494                    ))
 495            }
 496            Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
 497                event.clone(),
 498                wrapper,
 499                &request_body,
 500                first_event_at,
 501                checksum_matched,
 502            )),
 503        }
 504    }
 505
 506    to_upload
 507        .upload(&clickhouse_client)
 508        .await
 509        .map_err(|err| Error::Internal(anyhow!(err)))?;
 510
 511    Ok(())
 512}
 513
 514#[derive(Default)]
 515struct ToUpload {
 516    editor_events: Vec<EditorEventRow>,
 517    inline_completion_events: Vec<InlineCompletionEventRow>,
 518    assistant_events: Vec<AssistantEventRow>,
 519    call_events: Vec<CallEventRow>,
 520    cpu_events: Vec<CpuEventRow>,
 521    memory_events: Vec<MemoryEventRow>,
 522    app_events: Vec<AppEventRow>,
 523    setting_events: Vec<SettingEventRow>,
 524    extension_events: Vec<ExtensionEventRow>,
 525    edit_events: Vec<EditEventRow>,
 526    action_events: Vec<ActionEventRow>,
 527    repl_events: Vec<ReplEventRow>,
 528}
 529
 530impl ToUpload {
 531    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 532        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 533        write_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 534            .await
 535            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 536
 537        const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
 538        write_to_table(
 539            INLINE_COMPLETION_EVENTS_TABLE,
 540            &self.inline_completion_events,
 541            clickhouse_client,
 542        )
 543        .await
 544        .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
 545
 546        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 547        write_to_table(
 548            ASSISTANT_EVENTS_TABLE,
 549            &self.assistant_events,
 550            clickhouse_client,
 551        )
 552        .await
 553        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 554
 555        const CALL_EVENTS_TABLE: &str = "call_events";
 556        write_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 557            .await
 558            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 559
 560        const CPU_EVENTS_TABLE: &str = "cpu_events";
 561        write_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 562            .await
 563            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 564
 565        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 566        write_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 567            .await
 568            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 569
 570        const APP_EVENTS_TABLE: &str = "app_events";
 571        write_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 572            .await
 573            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 574
 575        const SETTING_EVENTS_TABLE: &str = "setting_events";
 576        write_to_table(
 577            SETTING_EVENTS_TABLE,
 578            &self.setting_events,
 579            clickhouse_client,
 580        )
 581        .await
 582        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 583
 584        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 585        write_to_table(
 586            EXTENSION_EVENTS_TABLE,
 587            &self.extension_events,
 588            clickhouse_client,
 589        )
 590        .await
 591        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 592
 593        const EDIT_EVENTS_TABLE: &str = "edit_events";
 594        write_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 595            .await
 596            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 597
 598        const ACTION_EVENTS_TABLE: &str = "action_events";
 599        write_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 600            .await
 601            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 602
 603        const REPL_EVENTS_TABLE: &str = "repl_events";
 604        write_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
 605            .await
 606            .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
 607
 608        Ok(())
 609    }
 610}
 611
 612pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 613where
 614    S: Serializer,
 615{
 616    if country_code.len() != 2 {
 617        use serde::ser::Error;
 618        return Err(S::Error::custom(
 619            "country_code must be exactly 2 characters",
 620        ));
 621    }
 622
 623    let country_code = country_code.as_bytes();
 624
 625    serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
 626}
 627
 628#[derive(Serialize, Debug, clickhouse::Row)]
 629pub struct EditorEventRow {
 630    installation_id: String,
 631    metrics_id: String,
 632    operation: String,
 633    app_version: String,
 634    file_extension: String,
 635    os_name: String,
 636    os_version: String,
 637    release_channel: String,
 638    signed_in: bool,
 639    vim_mode: bool,
 640    #[serde(serialize_with = "serialize_country_code")]
 641    country_code: String,
 642    region_code: String,
 643    city: String,
 644    time: i64,
 645    copilot_enabled: bool,
 646    copilot_enabled_for_language: bool,
 647    historical_event: bool,
 648    architecture: String,
 649    is_staff: Option<bool>,
 650    session_id: Option<String>,
 651    major: Option<i32>,
 652    minor: Option<i32>,
 653    patch: Option<i32>,
 654    checksum_matched: bool,
 655}
 656
 657impl EditorEventRow {
 658    fn from_event(
 659        event: EditorEvent,
 660        wrapper: &EventWrapper,
 661        body: &EventRequestBody,
 662        first_event_at: chrono::DateTime<chrono::Utc>,
 663        country_code: Option<String>,
 664        checksum_matched: bool,
 665    ) -> Self {
 666        let semver = body.semver();
 667        let time =
 668            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 669
 670        Self {
 671            app_version: body.app_version.clone(),
 672            major: semver.map(|v| v.major() as i32),
 673            minor: semver.map(|v| v.minor() as i32),
 674            patch: semver.map(|v| v.patch() as i32),
 675            checksum_matched,
 676            release_channel: body.release_channel.clone().unwrap_or_default(),
 677            os_name: body.os_name.clone(),
 678            os_version: body.os_version.clone().unwrap_or_default(),
 679            architecture: body.architecture.clone(),
 680            installation_id: body.installation_id.clone().unwrap_or_default(),
 681            metrics_id: body.metrics_id.clone().unwrap_or_default(),
 682            session_id: body.session_id.clone(),
 683            is_staff: body.is_staff,
 684            time: time.timestamp_millis(),
 685            operation: event.operation,
 686            file_extension: event.file_extension.unwrap_or_default(),
 687            signed_in: wrapper.signed_in,
 688            vim_mode: event.vim_mode,
 689            copilot_enabled: event.copilot_enabled,
 690            copilot_enabled_for_language: event.copilot_enabled_for_language,
 691            country_code: country_code.unwrap_or("XX".to_string()),
 692            region_code: "".to_string(),
 693            city: "".to_string(),
 694            historical_event: false,
 695        }
 696    }
 697}
 698
 699#[derive(Serialize, Debug, clickhouse::Row)]
 700pub struct InlineCompletionEventRow {
 701    installation_id: String,
 702    provider: String,
 703    suggestion_accepted: bool,
 704    app_version: String,
 705    file_extension: String,
 706    os_name: String,
 707    os_version: String,
 708    release_channel: String,
 709    signed_in: bool,
 710    #[serde(serialize_with = "serialize_country_code")]
 711    country_code: String,
 712    region_code: String,
 713    city: String,
 714    time: i64,
 715    is_staff: Option<bool>,
 716    session_id: Option<String>,
 717    major: Option<i32>,
 718    minor: Option<i32>,
 719    patch: Option<i32>,
 720    checksum_matched: bool,
 721}
 722
 723impl InlineCompletionEventRow {
 724    fn from_event(
 725        event: InlineCompletionEvent,
 726        wrapper: &EventWrapper,
 727        body: &EventRequestBody,
 728        first_event_at: chrono::DateTime<chrono::Utc>,
 729        country_code: Option<String>,
 730        checksum_matched: bool,
 731    ) -> Self {
 732        let semver = body.semver();
 733        let time =
 734            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 735
 736        Self {
 737            app_version: body.app_version.clone(),
 738            major: semver.map(|v| v.major() as i32),
 739            minor: semver.map(|v| v.minor() as i32),
 740            patch: semver.map(|v| v.patch() as i32),
 741            checksum_matched,
 742            release_channel: body.release_channel.clone().unwrap_or_default(),
 743            os_name: body.os_name.clone(),
 744            os_version: body.os_version.clone().unwrap_or_default(),
 745            installation_id: body.installation_id.clone().unwrap_or_default(),
 746            session_id: body.session_id.clone(),
 747            is_staff: body.is_staff,
 748            time: time.timestamp_millis(),
 749            file_extension: event.file_extension.unwrap_or_default(),
 750            signed_in: wrapper.signed_in,
 751            country_code: country_code.unwrap_or("XX".to_string()),
 752            region_code: "".to_string(),
 753            city: "".to_string(),
 754            provider: event.provider,
 755            suggestion_accepted: event.suggestion_accepted,
 756        }
 757    }
 758}
 759
 760#[derive(Serialize, Debug, clickhouse::Row)]
 761pub struct CallEventRow {
 762    // AppInfoBase
 763    app_version: String,
 764    major: Option<i32>,
 765    minor: Option<i32>,
 766    patch: Option<i32>,
 767    release_channel: String,
 768    os_name: String,
 769    os_version: String,
 770    checksum_matched: bool,
 771
 772    // ClientEventBase
 773    installation_id: String,
 774    session_id: Option<String>,
 775    is_staff: Option<bool>,
 776    time: i64,
 777
 778    // CallEventRow
 779    operation: String,
 780    room_id: Option<u64>,
 781    channel_id: Option<u64>,
 782}
 783
 784impl CallEventRow {
 785    fn from_event(
 786        event: CallEvent,
 787        wrapper: &EventWrapper,
 788        body: &EventRequestBody,
 789        first_event_at: chrono::DateTime<chrono::Utc>,
 790        checksum_matched: bool,
 791    ) -> Self {
 792        let semver = body.semver();
 793        let time =
 794            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 795
 796        Self {
 797            app_version: body.app_version.clone(),
 798            major: semver.map(|v| v.major() as i32),
 799            minor: semver.map(|v| v.minor() as i32),
 800            patch: semver.map(|v| v.patch() as i32),
 801            checksum_matched,
 802            release_channel: body.release_channel.clone().unwrap_or_default(),
 803            os_name: body.os_name.clone(),
 804            os_version: body.os_version.clone().unwrap_or_default(),
 805            installation_id: body.installation_id.clone().unwrap_or_default(),
 806            session_id: body.session_id.clone(),
 807            is_staff: body.is_staff,
 808            time: time.timestamp_millis(),
 809            operation: event.operation,
 810            room_id: event.room_id,
 811            channel_id: event.channel_id,
 812        }
 813    }
 814}
 815
 816#[derive(Serialize, Debug, clickhouse::Row)]
 817pub struct AssistantEventRow {
 818    // AppInfoBase
 819    app_version: String,
 820    major: Option<i32>,
 821    minor: Option<i32>,
 822    patch: Option<i32>,
 823    checksum_matched: bool,
 824    release_channel: String,
 825    os_name: String,
 826    os_version: String,
 827
 828    // ClientEventBase
 829    installation_id: Option<String>,
 830    session_id: Option<String>,
 831    is_staff: Option<bool>,
 832    time: i64,
 833
 834    // AssistantEventRow
 835    conversation_id: String,
 836    kind: String,
 837    model: String,
 838    response_latency_in_ms: Option<i64>,
 839    error_message: Option<String>,
 840}
 841
 842impl AssistantEventRow {
 843    fn from_event(
 844        event: AssistantEvent,
 845        wrapper: &EventWrapper,
 846        body: &EventRequestBody,
 847        first_event_at: chrono::DateTime<chrono::Utc>,
 848        checksum_matched: bool,
 849    ) -> Self {
 850        let semver = body.semver();
 851        let time =
 852            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 853
 854        Self {
 855            app_version: body.app_version.clone(),
 856            major: semver.map(|v| v.major() as i32),
 857            minor: semver.map(|v| v.minor() as i32),
 858            patch: semver.map(|v| v.patch() as i32),
 859            checksum_matched,
 860            release_channel: body.release_channel.clone().unwrap_or_default(),
 861            os_name: body.os_name.clone(),
 862            os_version: body.os_version.clone().unwrap_or_default(),
 863            installation_id: body.installation_id.clone(),
 864            session_id: body.session_id.clone(),
 865            is_staff: body.is_staff,
 866            time: time.timestamp_millis(),
 867            conversation_id: event.conversation_id.unwrap_or_default(),
 868            kind: event.kind.to_string(),
 869            model: event.model,
 870            response_latency_in_ms: event
 871                .response_latency
 872                .map(|latency| latency.as_millis() as i64),
 873            error_message: event.error_message,
 874        }
 875    }
 876}
 877
 878#[derive(Debug, clickhouse::Row, Serialize)]
 879pub struct CpuEventRow {
 880    installation_id: Option<String>,
 881    is_staff: Option<bool>,
 882    usage_as_percentage: f32,
 883    core_count: u32,
 884    app_version: String,
 885    release_channel: String,
 886    os_name: String,
 887    os_version: String,
 888    time: i64,
 889    session_id: Option<String>,
 890    // pub normalized_cpu_usage: f64, MATERIALIZED
 891    major: Option<i32>,
 892    minor: Option<i32>,
 893    patch: Option<i32>,
 894    checksum_matched: bool,
 895}
 896
 897impl CpuEventRow {
 898    fn from_event(
 899        event: CpuEvent,
 900        wrapper: &EventWrapper,
 901        body: &EventRequestBody,
 902        first_event_at: chrono::DateTime<chrono::Utc>,
 903        checksum_matched: bool,
 904    ) -> Self {
 905        let semver = body.semver();
 906        let time =
 907            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 908
 909        Self {
 910            app_version: body.app_version.clone(),
 911            major: semver.map(|v| v.major() as i32),
 912            minor: semver.map(|v| v.minor() as i32),
 913            patch: semver.map(|v| v.patch() as i32),
 914            checksum_matched,
 915            release_channel: body.release_channel.clone().unwrap_or_default(),
 916            os_name: body.os_name.clone(),
 917            os_version: body.os_version.clone().unwrap_or_default(),
 918            installation_id: body.installation_id.clone(),
 919            session_id: body.session_id.clone(),
 920            is_staff: body.is_staff,
 921            time: time.timestamp_millis(),
 922            usage_as_percentage: event.usage_as_percentage,
 923            core_count: event.core_count,
 924        }
 925    }
 926}
 927
 928#[derive(Serialize, Debug, clickhouse::Row)]
 929pub struct MemoryEventRow {
 930    // AppInfoBase
 931    app_version: String,
 932    major: Option<i32>,
 933    minor: Option<i32>,
 934    patch: Option<i32>,
 935    checksum_matched: bool,
 936    release_channel: String,
 937    os_name: String,
 938    os_version: String,
 939
 940    // ClientEventBase
 941    installation_id: Option<String>,
 942    session_id: Option<String>,
 943    is_staff: Option<bool>,
 944    time: i64,
 945
 946    // MemoryEventRow
 947    memory_in_bytes: u64,
 948    virtual_memory_in_bytes: u64,
 949}
 950
 951impl MemoryEventRow {
 952    fn from_event(
 953        event: MemoryEvent,
 954        wrapper: &EventWrapper,
 955        body: &EventRequestBody,
 956        first_event_at: chrono::DateTime<chrono::Utc>,
 957        checksum_matched: bool,
 958    ) -> Self {
 959        let semver = body.semver();
 960        let time =
 961            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 962
 963        Self {
 964            app_version: body.app_version.clone(),
 965            major: semver.map(|v| v.major() as i32),
 966            minor: semver.map(|v| v.minor() as i32),
 967            patch: semver.map(|v| v.patch() as i32),
 968            checksum_matched,
 969            release_channel: body.release_channel.clone().unwrap_or_default(),
 970            os_name: body.os_name.clone(),
 971            os_version: body.os_version.clone().unwrap_or_default(),
 972            installation_id: body.installation_id.clone(),
 973            session_id: body.session_id.clone(),
 974            is_staff: body.is_staff,
 975            time: time.timestamp_millis(),
 976            memory_in_bytes: event.memory_in_bytes,
 977            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
 978        }
 979    }
 980}
 981
 982#[derive(Serialize, Debug, clickhouse::Row)]
 983pub struct AppEventRow {
 984    // AppInfoBase
 985    app_version: String,
 986    major: Option<i32>,
 987    minor: Option<i32>,
 988    patch: Option<i32>,
 989    checksum_matched: bool,
 990    release_channel: String,
 991    os_name: String,
 992    os_version: String,
 993
 994    // ClientEventBase
 995    installation_id: Option<String>,
 996    session_id: Option<String>,
 997    is_staff: Option<bool>,
 998    time: i64,
 999
1000    // AppEventRow
1001    operation: String,
1002}
1003
1004impl AppEventRow {
1005    fn from_event(
1006        event: AppEvent,
1007        wrapper: &EventWrapper,
1008        body: &EventRequestBody,
1009        first_event_at: chrono::DateTime<chrono::Utc>,
1010        checksum_matched: bool,
1011    ) -> Self {
1012        let semver = body.semver();
1013        let time =
1014            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1015
1016        Self {
1017            app_version: body.app_version.clone(),
1018            major: semver.map(|v| v.major() as i32),
1019            minor: semver.map(|v| v.minor() as i32),
1020            patch: semver.map(|v| v.patch() as i32),
1021            checksum_matched,
1022            release_channel: body.release_channel.clone().unwrap_or_default(),
1023            os_name: body.os_name.clone(),
1024            os_version: body.os_version.clone().unwrap_or_default(),
1025            installation_id: body.installation_id.clone(),
1026            session_id: body.session_id.clone(),
1027            is_staff: body.is_staff,
1028            time: time.timestamp_millis(),
1029            operation: event.operation,
1030        }
1031    }
1032}
1033
1034#[derive(Serialize, Debug, clickhouse::Row)]
1035pub struct SettingEventRow {
1036    // AppInfoBase
1037    app_version: String,
1038    major: Option<i32>,
1039    minor: Option<i32>,
1040    patch: Option<i32>,
1041    checksum_matched: bool,
1042    release_channel: String,
1043    os_name: String,
1044    os_version: String,
1045
1046    // ClientEventBase
1047    installation_id: Option<String>,
1048    session_id: Option<String>,
1049    is_staff: Option<bool>,
1050    time: i64,
1051    // SettingEventRow
1052    setting: String,
1053    value: String,
1054}
1055
1056impl SettingEventRow {
1057    fn from_event(
1058        event: SettingEvent,
1059        wrapper: &EventWrapper,
1060        body: &EventRequestBody,
1061        first_event_at: chrono::DateTime<chrono::Utc>,
1062        checksum_matched: bool,
1063    ) -> Self {
1064        let semver = body.semver();
1065        let time =
1066            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1067
1068        Self {
1069            app_version: body.app_version.clone(),
1070            major: semver.map(|v| v.major() as i32),
1071            minor: semver.map(|v| v.minor() as i32),
1072            checksum_matched,
1073            patch: semver.map(|v| v.patch() as i32),
1074            release_channel: body.release_channel.clone().unwrap_or_default(),
1075            os_name: body.os_name.clone(),
1076            os_version: body.os_version.clone().unwrap_or_default(),
1077            installation_id: body.installation_id.clone(),
1078            session_id: body.session_id.clone(),
1079            is_staff: body.is_staff,
1080            time: time.timestamp_millis(),
1081            setting: event.setting,
1082            value: event.value,
1083        }
1084    }
1085}
1086
1087#[derive(Serialize, Debug, clickhouse::Row)]
1088pub struct ExtensionEventRow {
1089    // AppInfoBase
1090    app_version: String,
1091    major: Option<i32>,
1092    minor: Option<i32>,
1093    patch: Option<i32>,
1094    checksum_matched: bool,
1095    release_channel: String,
1096    os_name: String,
1097    os_version: String,
1098
1099    // ClientEventBase
1100    installation_id: Option<String>,
1101    session_id: Option<String>,
1102    is_staff: Option<bool>,
1103    time: i64,
1104
1105    // ExtensionEventRow
1106    extension_id: Arc<str>,
1107    extension_version: Arc<str>,
1108    dev: bool,
1109    schema_version: Option<i32>,
1110    wasm_api_version: Option<String>,
1111}
1112
1113impl ExtensionEventRow {
1114    fn from_event(
1115        event: ExtensionEvent,
1116        wrapper: &EventWrapper,
1117        body: &EventRequestBody,
1118        extension_metadata: Option<ExtensionMetadata>,
1119        first_event_at: chrono::DateTime<chrono::Utc>,
1120        checksum_matched: bool,
1121    ) -> Self {
1122        let semver = body.semver();
1123        let time =
1124            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1125
1126        Self {
1127            app_version: body.app_version.clone(),
1128            major: semver.map(|v| v.major() as i32),
1129            minor: semver.map(|v| v.minor() as i32),
1130            patch: semver.map(|v| v.patch() as i32),
1131            checksum_matched,
1132            release_channel: body.release_channel.clone().unwrap_or_default(),
1133            os_name: body.os_name.clone(),
1134            os_version: body.os_version.clone().unwrap_or_default(),
1135            installation_id: body.installation_id.clone(),
1136            session_id: body.session_id.clone(),
1137            is_staff: body.is_staff,
1138            time: time.timestamp_millis(),
1139            extension_id: event.extension_id,
1140            extension_version: event.version,
1141            dev: extension_metadata.is_none(),
1142            schema_version: extension_metadata
1143                .as_ref()
1144                .and_then(|metadata| metadata.manifest.schema_version),
1145            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1146                metadata
1147                    .manifest
1148                    .wasm_api_version
1149                    .as_ref()
1150                    .map(|version| version.to_string())
1151            }),
1152        }
1153    }
1154}
1155
1156#[derive(Serialize, Debug, clickhouse::Row)]
1157pub struct ReplEventRow {
1158    // AppInfoBase
1159    app_version: String,
1160    major: Option<i32>,
1161    minor: Option<i32>,
1162    patch: Option<i32>,
1163    checksum_matched: bool,
1164    release_channel: String,
1165    os_name: String,
1166    os_version: String,
1167
1168    // ClientEventBase
1169    installation_id: Option<String>,
1170    session_id: Option<String>,
1171    is_staff: Option<bool>,
1172    time: i64,
1173
1174    // ReplEventRow
1175    kernel_language: String,
1176    kernel_status: String,
1177    repl_session_id: String,
1178}
1179
1180impl ReplEventRow {
1181    fn from_event(
1182        event: ReplEvent,
1183        wrapper: &EventWrapper,
1184        body: &EventRequestBody,
1185        first_event_at: chrono::DateTime<chrono::Utc>,
1186        checksum_matched: bool,
1187    ) -> Self {
1188        let semver = body.semver();
1189        let time =
1190            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1191
1192        Self {
1193            app_version: body.app_version.clone(),
1194            major: semver.map(|v| v.major() as i32),
1195            minor: semver.map(|v| v.minor() as i32),
1196            patch: semver.map(|v| v.patch() as i32),
1197            checksum_matched,
1198            release_channel: body.release_channel.clone().unwrap_or_default(),
1199            os_name: body.os_name.clone(),
1200            os_version: body.os_version.clone().unwrap_or_default(),
1201            installation_id: body.installation_id.clone(),
1202            session_id: body.session_id.clone(),
1203            is_staff: body.is_staff,
1204            time: time.timestamp_millis(),
1205            kernel_language: event.kernel_language,
1206            kernel_status: event.kernel_status,
1207            repl_session_id: event.repl_session_id,
1208        }
1209    }
1210}
1211
1212#[derive(Serialize, Debug, clickhouse::Row)]
1213pub struct EditEventRow {
1214    // AppInfoBase
1215    app_version: String,
1216    major: Option<i32>,
1217    minor: Option<i32>,
1218    patch: Option<i32>,
1219    checksum_matched: bool,
1220    release_channel: String,
1221    os_name: String,
1222    os_version: String,
1223
1224    // ClientEventBase
1225    installation_id: Option<String>,
1226    // Note: This column name has a typo in the ClickHouse table.
1227    #[serde(rename = "sesssion_id")]
1228    session_id: Option<String>,
1229    is_staff: Option<bool>,
1230    time: i64,
1231
1232    // EditEventRow
1233    period_start: i64,
1234    period_end: i64,
1235    environment: String,
1236}
1237
1238impl EditEventRow {
1239    fn from_event(
1240        event: EditEvent,
1241        wrapper: &EventWrapper,
1242        body: &EventRequestBody,
1243        first_event_at: chrono::DateTime<chrono::Utc>,
1244        checksum_matched: bool,
1245    ) -> Self {
1246        let semver = body.semver();
1247        let time =
1248            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1249
1250        let period_start = time - chrono::Duration::milliseconds(event.duration);
1251        let period_end = time;
1252
1253        Self {
1254            app_version: body.app_version.clone(),
1255            major: semver.map(|v| v.major() as i32),
1256            minor: semver.map(|v| v.minor() as i32),
1257            patch: semver.map(|v| v.patch() as i32),
1258            checksum_matched,
1259            release_channel: body.release_channel.clone().unwrap_or_default(),
1260            os_name: body.os_name.clone(),
1261            os_version: body.os_version.clone().unwrap_or_default(),
1262            installation_id: body.installation_id.clone(),
1263            session_id: body.session_id.clone(),
1264            is_staff: body.is_staff,
1265            time: time.timestamp_millis(),
1266            period_start: period_start.timestamp_millis(),
1267            period_end: period_end.timestamp_millis(),
1268            environment: event.environment,
1269        }
1270    }
1271}
1272
1273#[derive(Serialize, Debug, clickhouse::Row)]
1274pub struct ActionEventRow {
1275    // AppInfoBase
1276    app_version: String,
1277    major: Option<i32>,
1278    minor: Option<i32>,
1279    patch: Option<i32>,
1280    checksum_matched: bool,
1281    release_channel: String,
1282    os_name: String,
1283    os_version: String,
1284
1285    // ClientEventBase
1286    installation_id: Option<String>,
1287    // Note: This column name has a typo in the ClickHouse table.
1288    #[serde(rename = "sesssion_id")]
1289    session_id: Option<String>,
1290    is_staff: Option<bool>,
1291    time: i64,
1292    // ActionEventRow
1293    source: String,
1294    action: String,
1295}
1296
1297impl ActionEventRow {
1298    fn from_event(
1299        event: ActionEvent,
1300        wrapper: &EventWrapper,
1301        body: &EventRequestBody,
1302        first_event_at: chrono::DateTime<chrono::Utc>,
1303        checksum_matched: bool,
1304    ) -> Self {
1305        let semver = body.semver();
1306        let time =
1307            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1308
1309        Self {
1310            app_version: body.app_version.clone(),
1311            major: semver.map(|v| v.major() as i32),
1312            minor: semver.map(|v| v.minor() as i32),
1313            patch: semver.map(|v| v.patch() as i32),
1314            checksum_matched,
1315            release_channel: body.release_channel.clone().unwrap_or_default(),
1316            os_name: body.os_name.clone(),
1317            os_version: body.os_version.clone().unwrap_or_default(),
1318            installation_id: body.installation_id.clone(),
1319            session_id: body.session_id.clone(),
1320            is_staff: body.is_staff,
1321            time: time.timestamp_millis(),
1322            source: event.source,
1323            action: event.action,
1324        }
1325    }
1326}
1327
1328pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1329    let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
1330
1331    let mut summer = Sha256::new();
1332    summer.update(checksum_seed);
1333    summer.update(json);
1334    summer.update(checksum_seed);
1335    Some(summer.finalize().into_iter().collect())
1336}