events.rs

   1use super::ips_file::IpsFile;
   2use crate::api::CloudflareIpCountryHeader;
   3use crate::clickhouse::write_to_table;
   4use crate::{api::slack, AppState, Error, Result};
   5use anyhow::{anyhow, Context};
   6use aws_sdk_s3::primitives::ByteStream;
   7use axum::{
   8    body::Bytes,
   9    headers::Header,
  10    http::{HeaderMap, HeaderName, StatusCode},
  11    routing::post,
  12    Extension, Router, TypedHeader,
  13};
  14use rpc::ExtensionMetadata;
  15use semantic_version::SemanticVersion;
  16use serde::{Serialize, Serializer};
  17use sha2::{Digest, Sha256};
  18use std::sync::{Arc, OnceLock};
  19use telemetry_events::{
  20    ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
  21    EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, Panic,
  22    ReplEvent, SettingEvent,
  23};
  24use uuid::Uuid;
  25
  26const CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  27
  28pub fn router() -> Router {
  29    Router::new()
  30        .route("/telemetry/events", post(post_events))
  31        .route("/telemetry/crashes", post(post_crash))
  32        .route("/telemetry/panics", post(post_panic))
  33        .route("/telemetry/hangs", post(post_hang))
  34}
  35
  36pub struct ZedChecksumHeader(Vec<u8>);
  37
  38impl Header for ZedChecksumHeader {
  39    fn name() -> &'static HeaderName {
  40        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  41        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  42    }
  43
  44    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  45    where
  46        Self: Sized,
  47        I: Iterator<Item = &'i axum::http::HeaderValue>,
  48    {
  49        let checksum = values
  50            .next()
  51            .ok_or_else(axum::headers::Error::invalid)?
  52            .to_str()
  53            .map_err(|_| axum::headers::Error::invalid())?;
  54
  55        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  56        Ok(Self(bytes))
  57    }
  58
  59    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  60        unimplemented!()
  61    }
  62}
  63
  64pub async fn post_crash(
  65    Extension(app): Extension<Arc<AppState>>,
  66    headers: HeaderMap,
  67    body: Bytes,
  68) -> Result<()> {
  69    let report = IpsFile::parse(&body)?;
  70    let version_threshold = SemanticVersion::new(0, 123, 0);
  71
  72    let bundle_id = &report.header.bundle_id;
  73    let app_version = &report.app_version();
  74
  75    if bundle_id == "dev.zed.Zed-Dev" {
  76        log::error!("Crash uploads from {} are ignored.", bundle_id);
  77        return Ok(());
  78    }
  79
  80    if app_version.is_none() || app_version.unwrap() < version_threshold {
  81        log::error!(
  82            "Crash uploads from {} are ignored.",
  83            report.header.app_version
  84        );
  85        return Ok(());
  86    }
  87    let app_version = app_version.unwrap();
  88
  89    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
  90        let response = blob_store_client
  91            .head_object()
  92            .bucket(CRASH_REPORTS_BUCKET)
  93            .key(report.header.incident_id.clone() + ".ips")
  94            .send()
  95            .await;
  96
  97        if response.is_ok() {
  98            log::info!("We've already uploaded this crash");
  99            return Ok(());
 100        }
 101
 102        blob_store_client
 103            .put_object()
 104            .bucket(CRASH_REPORTS_BUCKET)
 105            .key(report.header.incident_id.clone() + ".ips")
 106            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 107            .body(ByteStream::from(body.to_vec()))
 108            .send()
 109            .await
 110            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 111            .ok();
 112    }
 113
 114    let recent_panic_on: Option<i64> = headers
 115        .get("x-zed-panicked-on")
 116        .and_then(|h| h.to_str().ok())
 117        .and_then(|s| s.parse().ok());
 118
 119    let installation_id = headers
 120        .get("x-zed-installation-id")
 121        .and_then(|h| h.to_str().ok())
 122        .map(|s| s.to_string())
 123        .unwrap_or_default();
 124
 125    let mut recent_panic = None;
 126
 127    if let Some(recent_panic_on) = recent_panic_on {
 128        let crashed_at = match report.timestamp() {
 129            Ok(t) => Some(t),
 130            Err(e) => {
 131                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 132                None
 133            }
 134        };
 135        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 136            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 137        }
 138    }
 139
 140    let description = report.description(recent_panic);
 141    let summary = report.backtrace_summary();
 142
 143    tracing::error!(
 144        service = "client",
 145        version = %report.header.app_version,
 146        os_version = %report.header.os_version,
 147        bundle_id = %report.header.bundle_id,
 148        incident_id = %report.header.incident_id,
 149        installation_id = %installation_id,
 150        description = %description,
 151        backtrace = %summary,
 152        "crash report"
 153    );
 154
 155    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 156        let payload = slack::WebhookBody::new(|w| {
 157            w.add_section(|s| s.text(slack::Text::markdown(description)))
 158                .add_section(|s| {
 159                    s.add_field(slack::Text::markdown(format!(
 160                        "*Version:*\n{} ({})",
 161                        bundle_id, app_version
 162                    )))
 163                    .add_field({
 164                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 165                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 166                            hostname.strip_prefix("http://").unwrap_or_default()
 167                        });
 168
 169                        slack::Text::markdown(format!(
 170                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 171                            CRASH_REPORTS_BUCKET,
 172                            hostname,
 173                            report.header.incident_id,
 174                            report
 175                                .header
 176                                .incident_id
 177                                .chars()
 178                                .take(8)
 179                                .collect::<String>(),
 180                        ))
 181                    })
 182                })
 183                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 184        });
 185        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 186            log::error!("Failed to serialize payload to JSON: {err}");
 187            Error::Internal(anyhow!(err))
 188        })?;
 189
 190        reqwest::Client::new()
 191            .post(slack_panics_webhook)
 192            .header("Content-Type", "application/json")
 193            .body(payload_json)
 194            .send()
 195            .await
 196            .map_err(|err| {
 197                log::error!("Failed to send payload to Slack: {err}");
 198                Error::Internal(anyhow!(err))
 199            })?;
 200    }
 201
 202    Ok(())
 203}
 204
 205pub async fn post_hang(
 206    Extension(app): Extension<Arc<AppState>>,
 207    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 208    body: Bytes,
 209) -> Result<()> {
 210    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 211        return Err(Error::http(
 212            StatusCode::INTERNAL_SERVER_ERROR,
 213            "events not enabled".into(),
 214        ))?;
 215    };
 216
 217    if checksum != expected {
 218        return Err(Error::http(
 219            StatusCode::BAD_REQUEST,
 220            "invalid checksum".into(),
 221        ))?;
 222    }
 223
 224    let incident_id = Uuid::new_v4().to_string();
 225
 226    // dump JSON into S3 so we can get frame offsets if we need to.
 227    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 228        blob_store_client
 229            .put_object()
 230            .bucket(CRASH_REPORTS_BUCKET)
 231            .key(incident_id.clone() + ".hang.json")
 232            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 233            .body(ByteStream::from(body.to_vec()))
 234            .send()
 235            .await
 236            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 237            .ok();
 238    }
 239
 240    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
 241        log::error!("can't parse report json: {err}");
 242        Error::Internal(anyhow!(err))
 243    })?;
 244
 245    let mut backtrace = "Possible hang detected on main thread:".to_string();
 246    let unknown = "<unknown>".to_string();
 247    for frame in report.backtrace.iter() {
 248        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
 249    }
 250
 251    tracing::error!(
 252        service = "client",
 253        version = %report.app_version.unwrap_or_default().to_string(),
 254        os_name = %report.os_name,
 255        os_version = report.os_version.unwrap_or_default().to_string(),
 256        incident_id = %incident_id,
 257        installation_id = %report.installation_id.unwrap_or_default(),
 258        backtrace = %backtrace,
 259        "hang report");
 260
 261    Ok(())
 262}
 263
 264pub async fn post_panic(
 265    Extension(app): Extension<Arc<AppState>>,
 266    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 267    body: Bytes,
 268) -> Result<()> {
 269    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 270        return Err(Error::http(
 271            StatusCode::INTERNAL_SERVER_ERROR,
 272            "events not enabled".into(),
 273        ))?;
 274    };
 275
 276    if checksum != expected {
 277        return Err(Error::http(
 278            StatusCode::BAD_REQUEST,
 279            "invalid checksum".into(),
 280        ))?;
 281    }
 282
 283    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
 284        .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
 285    let panic = report.panic;
 286
 287    if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
 288        return Err(Error::http(
 289            StatusCode::BAD_REQUEST,
 290            "invalid os version".into(),
 291        ))?;
 292    }
 293
 294    tracing::error!(
 295        service = "client",
 296        version = %panic.app_version,
 297        os_name = %panic.os_name,
 298        os_version = %panic.os_version.clone().unwrap_or_default(),
 299        installation_id = %panic.installation_id.clone().unwrap_or_default(),
 300        description = %panic.payload,
 301        backtrace = %panic.backtrace.join("\n"),
 302        "panic report"
 303    );
 304
 305    let backtrace = if panic.backtrace.len() > 25 {
 306        let total = panic.backtrace.len();
 307        format!(
 308            "{}\n   and {} more",
 309            panic
 310                .backtrace
 311                .iter()
 312                .take(20)
 313                .cloned()
 314                .collect::<Vec<_>>()
 315                .join("\n"),
 316            total - 20
 317        )
 318    } else {
 319        panic.backtrace.join("\n")
 320    };
 321
 322    if !report_to_slack(&panic) {
 323        return Ok(());
 324    }
 325
 326    let backtrace_with_summary = panic.payload + "\n" + &backtrace;
 327
 328    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 329        let payload = slack::WebhookBody::new(|w| {
 330            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
 331                .add_section(|s| {
 332                    s.add_field(slack::Text::markdown(format!(
 333                        "*Version:*\n {} ",
 334                        panic.app_version
 335                    )))
 336                    .add_field({
 337                        slack::Text::markdown(format!(
 338                            "*OS:*\n{} {}",
 339                            panic.os_name,
 340                            panic.os_version.unwrap_or_default()
 341                        ))
 342                    })
 343                })
 344                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
 345        });
 346        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 347            log::error!("Failed to serialize payload to JSON: {err}");
 348            Error::Internal(anyhow!(err))
 349        })?;
 350
 351        reqwest::Client::new()
 352            .post(slack_panics_webhook)
 353            .header("Content-Type", "application/json")
 354            .body(payload_json)
 355            .send()
 356            .await
 357            .map_err(|err| {
 358                log::error!("Failed to send payload to Slack: {err}");
 359                Error::Internal(anyhow!(err))
 360            })?;
 361    }
 362
 363    Ok(())
 364}
 365
 366fn report_to_slack(panic: &Panic) -> bool {
 367    if panic.payload.contains("ERROR_SURFACE_LOST_KHR") {
 368        return false;
 369    }
 370
 371    if panic.payload.contains("ERROR_INITIALIZATION_FAILED") {
 372        return false;
 373    }
 374
 375    if panic
 376        .payload
 377        .contains("GPU has crashed, and no debug information is available")
 378    {
 379        return false;
 380    }
 381
 382    true
 383}
 384
 385pub async fn post_events(
 386    Extension(app): Extension<Arc<AppState>>,
 387    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 388    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 389    body: Bytes,
 390) -> Result<()> {
 391    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 392        Err(Error::http(
 393            StatusCode::NOT_IMPLEMENTED,
 394            "not supported".into(),
 395        ))?
 396    };
 397
 398    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 399        return Err(Error::http(
 400            StatusCode::INTERNAL_SERVER_ERROR,
 401            "events not enabled".into(),
 402        ))?;
 403    };
 404
 405    let checksum_matched = checksum == expected;
 406
 407    let request_body: telemetry_events::EventRequestBody =
 408        serde_json::from_slice(&body).map_err(|err| {
 409            log::error!("can't parse event json: {err}");
 410            Error::Internal(anyhow!(err))
 411        })?;
 412
 413    let mut to_upload = ToUpload::default();
 414    let Some(last_event) = request_body.events.last() else {
 415        return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
 416    };
 417    let country_code = country_code_header.map(|h| h.to_string());
 418
 419    let first_event_at = chrono::Utc::now()
 420        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 421
 422    for wrapper in &request_body.events {
 423        match &wrapper.event {
 424            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 425                event.clone(),
 426                wrapper,
 427                &request_body,
 428                first_event_at,
 429                country_code.clone(),
 430                checksum_matched,
 431            )),
 432            // Needed for clients sending old copilot_event types
 433            Event::Copilot(_) => {}
 434            Event::InlineCompletion(event) => {
 435                to_upload
 436                    .inline_completion_events
 437                    .push(InlineCompletionEventRow::from_event(
 438                        event.clone(),
 439                        wrapper,
 440                        &request_body,
 441                        first_event_at,
 442                        country_code.clone(),
 443                        checksum_matched,
 444                    ))
 445            }
 446            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 447                event.clone(),
 448                wrapper,
 449                &request_body,
 450                first_event_at,
 451                checksum_matched,
 452            )),
 453            Event::Assistant(event) => {
 454                to_upload
 455                    .assistant_events
 456                    .push(AssistantEventRow::from_event(
 457                        event.clone(),
 458                        wrapper,
 459                        &request_body,
 460                        first_event_at,
 461                        checksum_matched,
 462                    ))
 463            }
 464            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 465                event.clone(),
 466                wrapper,
 467                &request_body,
 468                first_event_at,
 469                checksum_matched,
 470            )),
 471            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 472                event.clone(),
 473                wrapper,
 474                &request_body,
 475                first_event_at,
 476                checksum_matched,
 477            )),
 478            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 479                event.clone(),
 480                wrapper,
 481                &request_body,
 482                first_event_at,
 483                checksum_matched,
 484            )),
 485            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 486                event.clone(),
 487                wrapper,
 488                &request_body,
 489                first_event_at,
 490                checksum_matched,
 491            )),
 492            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 493                event.clone(),
 494                wrapper,
 495                &request_body,
 496                first_event_at,
 497                checksum_matched,
 498            )),
 499            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 500                event.clone(),
 501                wrapper,
 502                &request_body,
 503                first_event_at,
 504                checksum_matched,
 505            )),
 506            Event::Extension(event) => {
 507                let metadata = app
 508                    .db
 509                    .get_extension_version(&event.extension_id, &event.version)
 510                    .await?;
 511                to_upload
 512                    .extension_events
 513                    .push(ExtensionEventRow::from_event(
 514                        event.clone(),
 515                        wrapper,
 516                        &request_body,
 517                        metadata,
 518                        first_event_at,
 519                        checksum_matched,
 520                    ))
 521            }
 522            Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
 523                event.clone(),
 524                wrapper,
 525                &request_body,
 526                first_event_at,
 527                checksum_matched,
 528            )),
 529        }
 530    }
 531
 532    to_upload
 533        .upload(&clickhouse_client)
 534        .await
 535        .map_err(|err| Error::Internal(anyhow!(err)))?;
 536
 537    Ok(())
 538}
 539
 540#[derive(Default)]
 541struct ToUpload {
 542    editor_events: Vec<EditorEventRow>,
 543    inline_completion_events: Vec<InlineCompletionEventRow>,
 544    assistant_events: Vec<AssistantEventRow>,
 545    call_events: Vec<CallEventRow>,
 546    cpu_events: Vec<CpuEventRow>,
 547    memory_events: Vec<MemoryEventRow>,
 548    app_events: Vec<AppEventRow>,
 549    setting_events: Vec<SettingEventRow>,
 550    extension_events: Vec<ExtensionEventRow>,
 551    edit_events: Vec<EditEventRow>,
 552    action_events: Vec<ActionEventRow>,
 553    repl_events: Vec<ReplEventRow>,
 554}
 555
 556impl ToUpload {
 557    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 558        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 559        write_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 560            .await
 561            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 562
 563        const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
 564        write_to_table(
 565            INLINE_COMPLETION_EVENTS_TABLE,
 566            &self.inline_completion_events,
 567            clickhouse_client,
 568        )
 569        .await
 570        .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
 571
 572        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 573        write_to_table(
 574            ASSISTANT_EVENTS_TABLE,
 575            &self.assistant_events,
 576            clickhouse_client,
 577        )
 578        .await
 579        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 580
 581        const CALL_EVENTS_TABLE: &str = "call_events";
 582        write_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 583            .await
 584            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 585
 586        const CPU_EVENTS_TABLE: &str = "cpu_events";
 587        write_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 588            .await
 589            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 590
 591        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 592        write_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 593            .await
 594            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 595
 596        const APP_EVENTS_TABLE: &str = "app_events";
 597        write_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 598            .await
 599            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 600
 601        const SETTING_EVENTS_TABLE: &str = "setting_events";
 602        write_to_table(
 603            SETTING_EVENTS_TABLE,
 604            &self.setting_events,
 605            clickhouse_client,
 606        )
 607        .await
 608        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 609
 610        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 611        write_to_table(
 612            EXTENSION_EVENTS_TABLE,
 613            &self.extension_events,
 614            clickhouse_client,
 615        )
 616        .await
 617        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 618
 619        const EDIT_EVENTS_TABLE: &str = "edit_events";
 620        write_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 621            .await
 622            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 623
 624        const ACTION_EVENTS_TABLE: &str = "action_events";
 625        write_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 626            .await
 627            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 628
 629        const REPL_EVENTS_TABLE: &str = "repl_events";
 630        write_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
 631            .await
 632            .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
 633
 634        Ok(())
 635    }
 636}
 637
 638pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 639where
 640    S: Serializer,
 641{
 642    if country_code.len() != 2 {
 643        use serde::ser::Error;
 644        return Err(S::Error::custom(
 645            "country_code must be exactly 2 characters",
 646        ));
 647    }
 648
 649    let country_code = country_code.as_bytes();
 650
 651    serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
 652}
 653
 654#[derive(Serialize, Debug, clickhouse::Row)]
 655pub struct EditorEventRow {
 656    system_id: String,
 657    installation_id: String,
 658    session_id: Option<String>,
 659    metrics_id: String,
 660    operation: String,
 661    app_version: String,
 662    file_extension: String,
 663    os_name: String,
 664    os_version: String,
 665    release_channel: String,
 666    signed_in: bool,
 667    vim_mode: bool,
 668    #[serde(serialize_with = "serialize_country_code")]
 669    country_code: String,
 670    region_code: String,
 671    city: String,
 672    time: i64,
 673    copilot_enabled: bool,
 674    copilot_enabled_for_language: bool,
 675    historical_event: bool,
 676    architecture: String,
 677    is_staff: Option<bool>,
 678    major: Option<i32>,
 679    minor: Option<i32>,
 680    patch: Option<i32>,
 681    checksum_matched: bool,
 682    is_via_ssh: bool,
 683}
 684
 685impl EditorEventRow {
 686    fn from_event(
 687        event: EditorEvent,
 688        wrapper: &EventWrapper,
 689        body: &EventRequestBody,
 690        first_event_at: chrono::DateTime<chrono::Utc>,
 691        country_code: Option<String>,
 692        checksum_matched: bool,
 693    ) -> Self {
 694        let semver = body.semver();
 695        let time =
 696            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 697
 698        Self {
 699            app_version: body.app_version.clone(),
 700            major: semver.map(|v| v.major() as i32),
 701            minor: semver.map(|v| v.minor() as i32),
 702            patch: semver.map(|v| v.patch() as i32),
 703            checksum_matched,
 704            release_channel: body.release_channel.clone().unwrap_or_default(),
 705            os_name: body.os_name.clone(),
 706            os_version: body.os_version.clone().unwrap_or_default(),
 707            architecture: body.architecture.clone(),
 708            system_id: body.system_id.clone().unwrap_or_default(),
 709            installation_id: body.installation_id.clone().unwrap_or_default(),
 710            session_id: body.session_id.clone(),
 711            metrics_id: body.metrics_id.clone().unwrap_or_default(),
 712            is_staff: body.is_staff,
 713            time: time.timestamp_millis(),
 714            operation: event.operation,
 715            file_extension: event.file_extension.unwrap_or_default(),
 716            signed_in: wrapper.signed_in,
 717            vim_mode: event.vim_mode,
 718            copilot_enabled: event.copilot_enabled,
 719            copilot_enabled_for_language: event.copilot_enabled_for_language,
 720            country_code: country_code.unwrap_or("XX".to_string()),
 721            region_code: "".to_string(),
 722            city: "".to_string(),
 723            historical_event: false,
 724            is_via_ssh: event.is_via_ssh,
 725        }
 726    }
 727}
 728
 729#[derive(Serialize, Debug, clickhouse::Row)]
 730pub struct InlineCompletionEventRow {
 731    installation_id: String,
 732    session_id: Option<String>,
 733    provider: String,
 734    suggestion_accepted: bool,
 735    app_version: String,
 736    file_extension: String,
 737    os_name: String,
 738    os_version: String,
 739    release_channel: String,
 740    signed_in: bool,
 741    #[serde(serialize_with = "serialize_country_code")]
 742    country_code: String,
 743    region_code: String,
 744    city: String,
 745    time: i64,
 746    is_staff: Option<bool>,
 747    major: Option<i32>,
 748    minor: Option<i32>,
 749    patch: Option<i32>,
 750    checksum_matched: bool,
 751}
 752
 753impl InlineCompletionEventRow {
 754    fn from_event(
 755        event: InlineCompletionEvent,
 756        wrapper: &EventWrapper,
 757        body: &EventRequestBody,
 758        first_event_at: chrono::DateTime<chrono::Utc>,
 759        country_code: Option<String>,
 760        checksum_matched: bool,
 761    ) -> Self {
 762        let semver = body.semver();
 763        let time =
 764            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 765
 766        Self {
 767            app_version: body.app_version.clone(),
 768            major: semver.map(|v| v.major() as i32),
 769            minor: semver.map(|v| v.minor() as i32),
 770            patch: semver.map(|v| v.patch() as i32),
 771            checksum_matched,
 772            release_channel: body.release_channel.clone().unwrap_or_default(),
 773            os_name: body.os_name.clone(),
 774            os_version: body.os_version.clone().unwrap_or_default(),
 775            installation_id: body.installation_id.clone().unwrap_or_default(),
 776            session_id: body.session_id.clone(),
 777            is_staff: body.is_staff,
 778            time: time.timestamp_millis(),
 779            file_extension: event.file_extension.unwrap_or_default(),
 780            signed_in: wrapper.signed_in,
 781            country_code: country_code.unwrap_or("XX".to_string()),
 782            region_code: "".to_string(),
 783            city: "".to_string(),
 784            provider: event.provider,
 785            suggestion_accepted: event.suggestion_accepted,
 786        }
 787    }
 788}
 789
 790#[derive(Serialize, Debug, clickhouse::Row)]
 791pub struct CallEventRow {
 792    // AppInfoBase
 793    app_version: String,
 794    major: Option<i32>,
 795    minor: Option<i32>,
 796    patch: Option<i32>,
 797    release_channel: String,
 798    os_name: String,
 799    os_version: String,
 800    checksum_matched: bool,
 801
 802    // ClientEventBase
 803    installation_id: String,
 804    session_id: Option<String>,
 805    is_staff: Option<bool>,
 806    time: i64,
 807
 808    // CallEventRow
 809    operation: String,
 810    room_id: Option<u64>,
 811    channel_id: Option<u64>,
 812}
 813
 814impl CallEventRow {
 815    fn from_event(
 816        event: CallEvent,
 817        wrapper: &EventWrapper,
 818        body: &EventRequestBody,
 819        first_event_at: chrono::DateTime<chrono::Utc>,
 820        checksum_matched: bool,
 821    ) -> Self {
 822        let semver = body.semver();
 823        let time =
 824            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 825
 826        Self {
 827            app_version: body.app_version.clone(),
 828            major: semver.map(|v| v.major() as i32),
 829            minor: semver.map(|v| v.minor() as i32),
 830            patch: semver.map(|v| v.patch() as i32),
 831            checksum_matched,
 832            release_channel: body.release_channel.clone().unwrap_or_default(),
 833            os_name: body.os_name.clone(),
 834            os_version: body.os_version.clone().unwrap_or_default(),
 835            installation_id: body.installation_id.clone().unwrap_or_default(),
 836            session_id: body.session_id.clone(),
 837            is_staff: body.is_staff,
 838            time: time.timestamp_millis(),
 839            operation: event.operation,
 840            room_id: event.room_id,
 841            channel_id: event.channel_id,
 842        }
 843    }
 844}
 845
 846#[derive(Serialize, Debug, clickhouse::Row)]
 847pub struct AssistantEventRow {
 848    // AppInfoBase
 849    app_version: String,
 850    major: Option<i32>,
 851    minor: Option<i32>,
 852    patch: Option<i32>,
 853    checksum_matched: bool,
 854    release_channel: String,
 855    os_name: String,
 856    os_version: String,
 857
 858    // ClientEventBase
 859    installation_id: Option<String>,
 860    session_id: Option<String>,
 861    is_staff: Option<bool>,
 862    time: i64,
 863
 864    // AssistantEventRow
 865    conversation_id: String,
 866    kind: String,
 867    phase: String,
 868    model: String,
 869    response_latency_in_ms: Option<i64>,
 870    error_message: Option<String>,
 871}
 872
 873impl AssistantEventRow {
 874    fn from_event(
 875        event: AssistantEvent,
 876        wrapper: &EventWrapper,
 877        body: &EventRequestBody,
 878        first_event_at: chrono::DateTime<chrono::Utc>,
 879        checksum_matched: bool,
 880    ) -> Self {
 881        let semver = body.semver();
 882        let time =
 883            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 884
 885        Self {
 886            app_version: body.app_version.clone(),
 887            major: semver.map(|v| v.major() as i32),
 888            minor: semver.map(|v| v.minor() as i32),
 889            patch: semver.map(|v| v.patch() as i32),
 890            checksum_matched,
 891            release_channel: body.release_channel.clone().unwrap_or_default(),
 892            os_name: body.os_name.clone(),
 893            os_version: body.os_version.clone().unwrap_or_default(),
 894            installation_id: body.installation_id.clone(),
 895            session_id: body.session_id.clone(),
 896            is_staff: body.is_staff,
 897            time: time.timestamp_millis(),
 898            conversation_id: event.conversation_id.unwrap_or_default(),
 899            kind: event.kind.to_string(),
 900            phase: event.phase.to_string(),
 901            model: event.model,
 902            response_latency_in_ms: event
 903                .response_latency
 904                .map(|latency| latency.as_millis() as i64),
 905            error_message: event.error_message,
 906        }
 907    }
 908}
 909
 910#[derive(Debug, clickhouse::Row, Serialize)]
 911pub struct CpuEventRow {
 912    installation_id: Option<String>,
 913    session_id: Option<String>,
 914    is_staff: Option<bool>,
 915    usage_as_percentage: f32,
 916    core_count: u32,
 917    app_version: String,
 918    release_channel: String,
 919    os_name: String,
 920    os_version: String,
 921    time: i64,
 922    // pub normalized_cpu_usage: f64, MATERIALIZED
 923    major: Option<i32>,
 924    minor: Option<i32>,
 925    patch: Option<i32>,
 926    checksum_matched: bool,
 927}
 928
 929impl CpuEventRow {
 930    fn from_event(
 931        event: CpuEvent,
 932        wrapper: &EventWrapper,
 933        body: &EventRequestBody,
 934        first_event_at: chrono::DateTime<chrono::Utc>,
 935        checksum_matched: bool,
 936    ) -> Self {
 937        let semver = body.semver();
 938        let time =
 939            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 940
 941        Self {
 942            app_version: body.app_version.clone(),
 943            major: semver.map(|v| v.major() as i32),
 944            minor: semver.map(|v| v.minor() as i32),
 945            patch: semver.map(|v| v.patch() as i32),
 946            checksum_matched,
 947            release_channel: body.release_channel.clone().unwrap_or_default(),
 948            os_name: body.os_name.clone(),
 949            os_version: body.os_version.clone().unwrap_or_default(),
 950            installation_id: body.installation_id.clone(),
 951            session_id: body.session_id.clone(),
 952            is_staff: body.is_staff,
 953            time: time.timestamp_millis(),
 954            usage_as_percentage: event.usage_as_percentage,
 955            core_count: event.core_count,
 956        }
 957    }
 958}
 959
 960#[derive(Serialize, Debug, clickhouse::Row)]
 961pub struct MemoryEventRow {
 962    // AppInfoBase
 963    app_version: String,
 964    major: Option<i32>,
 965    minor: Option<i32>,
 966    patch: Option<i32>,
 967    checksum_matched: bool,
 968    release_channel: String,
 969    os_name: String,
 970    os_version: String,
 971
 972    // ClientEventBase
 973    installation_id: Option<String>,
 974    session_id: Option<String>,
 975    is_staff: Option<bool>,
 976    time: i64,
 977
 978    // MemoryEventRow
 979    memory_in_bytes: u64,
 980    virtual_memory_in_bytes: u64,
 981}
 982
 983impl MemoryEventRow {
 984    fn from_event(
 985        event: MemoryEvent,
 986        wrapper: &EventWrapper,
 987        body: &EventRequestBody,
 988        first_event_at: chrono::DateTime<chrono::Utc>,
 989        checksum_matched: bool,
 990    ) -> Self {
 991        let semver = body.semver();
 992        let time =
 993            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 994
 995        Self {
 996            app_version: body.app_version.clone(),
 997            major: semver.map(|v| v.major() as i32),
 998            minor: semver.map(|v| v.minor() as i32),
 999            patch: semver.map(|v| v.patch() as i32),
1000            checksum_matched,
1001            release_channel: body.release_channel.clone().unwrap_or_default(),
1002            os_name: body.os_name.clone(),
1003            os_version: body.os_version.clone().unwrap_or_default(),
1004            installation_id: body.installation_id.clone(),
1005            session_id: body.session_id.clone(),
1006            is_staff: body.is_staff,
1007            time: time.timestamp_millis(),
1008            memory_in_bytes: event.memory_in_bytes,
1009            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
1010        }
1011    }
1012}
1013
1014#[derive(Serialize, Debug, clickhouse::Row)]
1015pub struct AppEventRow {
1016    // AppInfoBase
1017    app_version: String,
1018    major: Option<i32>,
1019    minor: Option<i32>,
1020    patch: Option<i32>,
1021    checksum_matched: bool,
1022    release_channel: String,
1023    os_name: String,
1024    os_version: String,
1025
1026    // ClientEventBase
1027    installation_id: Option<String>,
1028    session_id: Option<String>,
1029    is_staff: Option<bool>,
1030    time: i64,
1031
1032    // AppEventRow
1033    operation: String,
1034}
1035
1036impl AppEventRow {
1037    fn from_event(
1038        event: AppEvent,
1039        wrapper: &EventWrapper,
1040        body: &EventRequestBody,
1041        first_event_at: chrono::DateTime<chrono::Utc>,
1042        checksum_matched: bool,
1043    ) -> Self {
1044        let semver = body.semver();
1045        let time =
1046            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1047
1048        Self {
1049            app_version: body.app_version.clone(),
1050            major: semver.map(|v| v.major() as i32),
1051            minor: semver.map(|v| v.minor() as i32),
1052            patch: semver.map(|v| v.patch() as i32),
1053            checksum_matched,
1054            release_channel: body.release_channel.clone().unwrap_or_default(),
1055            os_name: body.os_name.clone(),
1056            os_version: body.os_version.clone().unwrap_or_default(),
1057            installation_id: body.installation_id.clone(),
1058            session_id: body.session_id.clone(),
1059            is_staff: body.is_staff,
1060            time: time.timestamp_millis(),
1061            operation: event.operation,
1062        }
1063    }
1064}
1065
1066#[derive(Serialize, Debug, clickhouse::Row)]
1067pub struct SettingEventRow {
1068    // AppInfoBase
1069    app_version: String,
1070    major: Option<i32>,
1071    minor: Option<i32>,
1072    patch: Option<i32>,
1073    checksum_matched: bool,
1074    release_channel: String,
1075    os_name: String,
1076    os_version: String,
1077
1078    // ClientEventBase
1079    installation_id: Option<String>,
1080    session_id: Option<String>,
1081    is_staff: Option<bool>,
1082    time: i64,
1083    // SettingEventRow
1084    setting: String,
1085    value: String,
1086}
1087
1088impl SettingEventRow {
1089    fn from_event(
1090        event: SettingEvent,
1091        wrapper: &EventWrapper,
1092        body: &EventRequestBody,
1093        first_event_at: chrono::DateTime<chrono::Utc>,
1094        checksum_matched: bool,
1095    ) -> Self {
1096        let semver = body.semver();
1097        let time =
1098            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1099
1100        Self {
1101            app_version: body.app_version.clone(),
1102            major: semver.map(|v| v.major() as i32),
1103            minor: semver.map(|v| v.minor() as i32),
1104            checksum_matched,
1105            patch: semver.map(|v| v.patch() as i32),
1106            release_channel: body.release_channel.clone().unwrap_or_default(),
1107            os_name: body.os_name.clone(),
1108            os_version: body.os_version.clone().unwrap_or_default(),
1109            installation_id: body.installation_id.clone(),
1110            session_id: body.session_id.clone(),
1111            is_staff: body.is_staff,
1112            time: time.timestamp_millis(),
1113            setting: event.setting,
1114            value: event.value,
1115        }
1116    }
1117}
1118
1119#[derive(Serialize, Debug, clickhouse::Row)]
1120pub struct ExtensionEventRow {
1121    // AppInfoBase
1122    app_version: String,
1123    major: Option<i32>,
1124    minor: Option<i32>,
1125    patch: Option<i32>,
1126    checksum_matched: bool,
1127    release_channel: String,
1128    os_name: String,
1129    os_version: String,
1130
1131    // ClientEventBase
1132    installation_id: Option<String>,
1133    session_id: Option<String>,
1134    is_staff: Option<bool>,
1135    time: i64,
1136
1137    // ExtensionEventRow
1138    extension_id: Arc<str>,
1139    extension_version: Arc<str>,
1140    dev: bool,
1141    schema_version: Option<i32>,
1142    wasm_api_version: Option<String>,
1143}
1144
1145impl ExtensionEventRow {
1146    fn from_event(
1147        event: ExtensionEvent,
1148        wrapper: &EventWrapper,
1149        body: &EventRequestBody,
1150        extension_metadata: Option<ExtensionMetadata>,
1151        first_event_at: chrono::DateTime<chrono::Utc>,
1152        checksum_matched: bool,
1153    ) -> Self {
1154        let semver = body.semver();
1155        let time =
1156            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1157
1158        Self {
1159            app_version: body.app_version.clone(),
1160            major: semver.map(|v| v.major() as i32),
1161            minor: semver.map(|v| v.minor() as i32),
1162            patch: semver.map(|v| v.patch() as i32),
1163            checksum_matched,
1164            release_channel: body.release_channel.clone().unwrap_or_default(),
1165            os_name: body.os_name.clone(),
1166            os_version: body.os_version.clone().unwrap_or_default(),
1167            installation_id: body.installation_id.clone(),
1168            session_id: body.session_id.clone(),
1169            is_staff: body.is_staff,
1170            time: time.timestamp_millis(),
1171            extension_id: event.extension_id,
1172            extension_version: event.version,
1173            dev: extension_metadata.is_none(),
1174            schema_version: extension_metadata
1175                .as_ref()
1176                .and_then(|metadata| metadata.manifest.schema_version),
1177            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1178                metadata
1179                    .manifest
1180                    .wasm_api_version
1181                    .as_ref()
1182                    .map(|version| version.to_string())
1183            }),
1184        }
1185    }
1186}
1187
1188#[derive(Serialize, Debug, clickhouse::Row)]
1189pub struct ReplEventRow {
1190    // AppInfoBase
1191    app_version: String,
1192    major: Option<i32>,
1193    minor: Option<i32>,
1194    patch: Option<i32>,
1195    checksum_matched: bool,
1196    release_channel: String,
1197    os_name: String,
1198    os_version: String,
1199
1200    // ClientEventBase
1201    installation_id: Option<String>,
1202    session_id: Option<String>,
1203    is_staff: Option<bool>,
1204    time: i64,
1205
1206    // ReplEventRow
1207    kernel_language: String,
1208    kernel_status: String,
1209    repl_session_id: String,
1210}
1211
1212impl ReplEventRow {
1213    fn from_event(
1214        event: ReplEvent,
1215        wrapper: &EventWrapper,
1216        body: &EventRequestBody,
1217        first_event_at: chrono::DateTime<chrono::Utc>,
1218        checksum_matched: bool,
1219    ) -> Self {
1220        let semver = body.semver();
1221        let time =
1222            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1223
1224        Self {
1225            app_version: body.app_version.clone(),
1226            major: semver.map(|v| v.major() as i32),
1227            minor: semver.map(|v| v.minor() as i32),
1228            patch: semver.map(|v| v.patch() as i32),
1229            checksum_matched,
1230            release_channel: body.release_channel.clone().unwrap_or_default(),
1231            os_name: body.os_name.clone(),
1232            os_version: body.os_version.clone().unwrap_or_default(),
1233            installation_id: body.installation_id.clone(),
1234            session_id: body.session_id.clone(),
1235            is_staff: body.is_staff,
1236            time: time.timestamp_millis(),
1237            kernel_language: event.kernel_language,
1238            kernel_status: event.kernel_status,
1239            repl_session_id: event.repl_session_id,
1240        }
1241    }
1242}
1243
1244#[derive(Serialize, Debug, clickhouse::Row)]
1245pub struct EditEventRow {
1246    // AppInfoBase
1247    app_version: String,
1248    major: Option<i32>,
1249    minor: Option<i32>,
1250    patch: Option<i32>,
1251    checksum_matched: bool,
1252    release_channel: String,
1253    os_name: String,
1254    os_version: String,
1255
1256    // ClientEventBase
1257    installation_id: Option<String>,
1258    // Note: This column name has a typo in the ClickHouse table.
1259    #[serde(rename = "sesssion_id")]
1260    session_id: Option<String>,
1261    is_staff: Option<bool>,
1262    time: i64,
1263
1264    // EditEventRow
1265    period_start: i64,
1266    period_end: i64,
1267    environment: String,
1268}
1269
1270impl EditEventRow {
1271    fn from_event(
1272        event: EditEvent,
1273        wrapper: &EventWrapper,
1274        body: &EventRequestBody,
1275        first_event_at: chrono::DateTime<chrono::Utc>,
1276        checksum_matched: bool,
1277    ) -> Self {
1278        let semver = body.semver();
1279        let time =
1280            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1281
1282        let period_start = time - chrono::Duration::milliseconds(event.duration);
1283        let period_end = time;
1284
1285        Self {
1286            app_version: body.app_version.clone(),
1287            major: semver.map(|v| v.major() as i32),
1288            minor: semver.map(|v| v.minor() as i32),
1289            patch: semver.map(|v| v.patch() as i32),
1290            checksum_matched,
1291            release_channel: body.release_channel.clone().unwrap_or_default(),
1292            os_name: body.os_name.clone(),
1293            os_version: body.os_version.clone().unwrap_or_default(),
1294            installation_id: body.installation_id.clone(),
1295            session_id: body.session_id.clone(),
1296            is_staff: body.is_staff,
1297            time: time.timestamp_millis(),
1298            period_start: period_start.timestamp_millis(),
1299            period_end: period_end.timestamp_millis(),
1300            environment: event.environment,
1301        }
1302    }
1303}
1304
1305#[derive(Serialize, Debug, clickhouse::Row)]
1306pub struct ActionEventRow {
1307    // AppInfoBase
1308    app_version: String,
1309    major: Option<i32>,
1310    minor: Option<i32>,
1311    patch: Option<i32>,
1312    checksum_matched: bool,
1313    release_channel: String,
1314    os_name: String,
1315    os_version: String,
1316
1317    // ClientEventBase
1318    installation_id: Option<String>,
1319    // Note: This column name has a typo in the ClickHouse table.
1320    #[serde(rename = "sesssion_id")]
1321    session_id: Option<String>,
1322    is_staff: Option<bool>,
1323    time: i64,
1324    // ActionEventRow
1325    source: String,
1326    action: String,
1327}
1328
1329impl ActionEventRow {
1330    fn from_event(
1331        event: ActionEvent,
1332        wrapper: &EventWrapper,
1333        body: &EventRequestBody,
1334        first_event_at: chrono::DateTime<chrono::Utc>,
1335        checksum_matched: bool,
1336    ) -> Self {
1337        let semver = body.semver();
1338        let time =
1339            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1340
1341        Self {
1342            app_version: body.app_version.clone(),
1343            major: semver.map(|v| v.major() as i32),
1344            minor: semver.map(|v| v.minor() as i32),
1345            patch: semver.map(|v| v.patch() as i32),
1346            checksum_matched,
1347            release_channel: body.release_channel.clone().unwrap_or_default(),
1348            os_name: body.os_name.clone(),
1349            os_version: body.os_version.clone().unwrap_or_default(),
1350            installation_id: body.installation_id.clone(),
1351            session_id: body.session_id.clone(),
1352            is_staff: body.is_staff,
1353            time: time.timestamp_millis(),
1354            source: event.source,
1355            action: event.action,
1356        }
1357    }
1358}
1359
1360pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1361    let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
1362
1363    let mut summer = Sha256::new();
1364    summer.update(checksum_seed);
1365    summer.update(json);
1366    summer.update(checksum_seed);
1367    Some(summer.finalize().into_iter().collect())
1368}