events.rs

   1use super::ips_file::IpsFile;
   2use crate::api::CloudflareIpCountryHeader;
   3use crate::{api::slack, AppState, Error, Result};
   4use anyhow::{anyhow, Context};
   5use aws_sdk_s3::primitives::ByteStream;
   6use axum::{
   7    body::Bytes,
   8    headers::Header,
   9    http::{HeaderMap, HeaderName, StatusCode},
  10    routing::post,
  11    Extension, Router, TypedHeader,
  12};
  13use rpc::ExtensionMetadata;
  14use semantic_version::SemanticVersion;
  15use serde::{Serialize, Serializer};
  16use sha2::{Digest, Sha256};
  17use std::sync::{Arc, OnceLock};
  18use telemetry_events::{
  19    ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
  20    EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, ReplEvent,
  21    SettingEvent,
  22};
  23use uuid::Uuid;
  24
  25static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  26
  27pub fn router() -> Router {
  28    Router::new()
  29        .route("/telemetry/events", post(post_events))
  30        .route("/telemetry/crashes", post(post_crash))
  31        .route("/telemetry/panics", post(post_panic))
  32        .route("/telemetry/hangs", post(post_hang))
  33}
  34
  35pub struct ZedChecksumHeader(Vec<u8>);
  36
  37impl Header for ZedChecksumHeader {
  38    fn name() -> &'static HeaderName {
  39        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  40        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  41    }
  42
  43    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  44    where
  45        Self: Sized,
  46        I: Iterator<Item = &'i axum::http::HeaderValue>,
  47    {
  48        let checksum = values
  49            .next()
  50            .ok_or_else(axum::headers::Error::invalid)?
  51            .to_str()
  52            .map_err(|_| axum::headers::Error::invalid())?;
  53
  54        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  55        Ok(Self(bytes))
  56    }
  57
  58    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  59        unimplemented!()
  60    }
  61}
  62
  63pub async fn post_crash(
  64    Extension(app): Extension<Arc<AppState>>,
  65    headers: HeaderMap,
  66    body: Bytes,
  67) -> Result<()> {
  68    let report = IpsFile::parse(&body)?;
  69    let version_threshold = SemanticVersion::new(0, 123, 0);
  70
  71    let bundle_id = &report.header.bundle_id;
  72    let app_version = &report.app_version();
  73
  74    if bundle_id == "dev.zed.Zed-Dev" {
  75        log::error!("Crash uploads from {} are ignored.", bundle_id);
  76        return Ok(());
  77    }
  78
  79    if app_version.is_none() || app_version.unwrap() < version_threshold {
  80        log::error!(
  81            "Crash uploads from {} are ignored.",
  82            report.header.app_version
  83        );
  84        return Ok(());
  85    }
  86    let app_version = app_version.unwrap();
  87
  88    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
  89        let response = blob_store_client
  90            .head_object()
  91            .bucket(CRASH_REPORTS_BUCKET)
  92            .key(report.header.incident_id.clone() + ".ips")
  93            .send()
  94            .await;
  95
  96        if response.is_ok() {
  97            log::info!("We've already uploaded this crash");
  98            return Ok(());
  99        }
 100
 101        blob_store_client
 102            .put_object()
 103            .bucket(CRASH_REPORTS_BUCKET)
 104            .key(report.header.incident_id.clone() + ".ips")
 105            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 106            .body(ByteStream::from(body.to_vec()))
 107            .send()
 108            .await
 109            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 110            .ok();
 111    }
 112
 113    let recent_panic_on: Option<i64> = headers
 114        .get("x-zed-panicked-on")
 115        .and_then(|h| h.to_str().ok())
 116        .and_then(|s| s.parse().ok());
 117
 118    let installation_id = headers
 119        .get("x-zed-installation-id")
 120        .and_then(|h| h.to_str().ok())
 121        .map(|s| s.to_string())
 122        .unwrap_or_default();
 123
 124    let mut recent_panic = None;
 125
 126    if let Some(recent_panic_on) = recent_panic_on {
 127        let crashed_at = match report.timestamp() {
 128            Ok(t) => Some(t),
 129            Err(e) => {
 130                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 131                None
 132            }
 133        };
 134        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 135            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 136        }
 137    }
 138
 139    let description = report.description(recent_panic);
 140    let summary = report.backtrace_summary();
 141
 142    tracing::error!(
 143        service = "client",
 144        version = %report.header.app_version,
 145        os_version = %report.header.os_version,
 146        bundle_id = %report.header.bundle_id,
 147        incident_id = %report.header.incident_id,
 148        installation_id = %installation_id,
 149        description = %description,
 150        backtrace = %summary,
 151        "crash report");
 152
 153    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 154        let payload = slack::WebhookBody::new(|w| {
 155            w.add_section(|s| s.text(slack::Text::markdown(description)))
 156                .add_section(|s| {
 157                    s.add_field(slack::Text::markdown(format!(
 158                        "*Version:*\n{} ({})",
 159                        bundle_id, app_version
 160                    )))
 161                    .add_field({
 162                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 163                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 164                            hostname.strip_prefix("http://").unwrap_or_default()
 165                        });
 166
 167                        slack::Text::markdown(format!(
 168                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 169                            CRASH_REPORTS_BUCKET,
 170                            hostname,
 171                            report.header.incident_id,
 172                            report
 173                                .header
 174                                .incident_id
 175                                .chars()
 176                                .take(8)
 177                                .collect::<String>(),
 178                        ))
 179                    })
 180                })
 181                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 182        });
 183        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 184            log::error!("Failed to serialize payload to JSON: {err}");
 185            Error::Internal(anyhow!(err))
 186        })?;
 187
 188        reqwest::Client::new()
 189            .post(slack_panics_webhook)
 190            .header("Content-Type", "application/json")
 191            .body(payload_json)
 192            .send()
 193            .await
 194            .map_err(|err| {
 195                log::error!("Failed to send payload to Slack: {err}");
 196                Error::Internal(anyhow!(err))
 197            })?;
 198    }
 199
 200    Ok(())
 201}
 202
 203pub async fn post_hang(
 204    Extension(app): Extension<Arc<AppState>>,
 205    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 206    body: Bytes,
 207) -> Result<()> {
 208    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 209        return Err(Error::http(
 210            StatusCode::INTERNAL_SERVER_ERROR,
 211            "events not enabled".into(),
 212        ))?;
 213    };
 214
 215    if checksum != expected {
 216        return Err(Error::http(
 217            StatusCode::BAD_REQUEST,
 218            "invalid checksum".into(),
 219        ))?;
 220    }
 221
 222    let incident_id = Uuid::new_v4().to_string();
 223
 224    // dump JSON into S3 so we can get frame offsets if we need to.
 225    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 226        blob_store_client
 227            .put_object()
 228            .bucket(CRASH_REPORTS_BUCKET)
 229            .key(incident_id.clone() + ".hang.json")
 230            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 231            .body(ByteStream::from(body.to_vec()))
 232            .send()
 233            .await
 234            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 235            .ok();
 236    }
 237
 238    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
 239        log::error!("can't parse report json: {err}");
 240        Error::Internal(anyhow!(err))
 241    })?;
 242
 243    let mut backtrace = "Possible hang detected on main thread:".to_string();
 244    let unknown = "<unknown>".to_string();
 245    for frame in report.backtrace.iter() {
 246        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
 247    }
 248
 249    tracing::error!(
 250        service = "client",
 251        version = %report.app_version.unwrap_or_default().to_string(),
 252        os_name = %report.os_name,
 253        os_version = report.os_version.unwrap_or_default().to_string(),
 254        incident_id = %incident_id,
 255        installation_id = %report.installation_id.unwrap_or_default(),
 256        backtrace = %backtrace,
 257        "hang report");
 258
 259    Ok(())
 260}
 261
 262pub async fn post_panic(
 263    Extension(app): Extension<Arc<AppState>>,
 264    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 265    body: Bytes,
 266) -> Result<()> {
 267    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 268        return Err(Error::http(
 269            StatusCode::INTERNAL_SERVER_ERROR,
 270            "events not enabled".into(),
 271        ))?;
 272    };
 273
 274    if checksum != expected {
 275        return Err(Error::http(
 276            StatusCode::BAD_REQUEST,
 277            "invalid checksum".into(),
 278        ))?;
 279    }
 280
 281    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
 282        .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
 283    let panic = report.panic;
 284
 285    if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) {
 286        return Err(Error::http(
 287            StatusCode::BAD_REQUEST,
 288            "invalid os version".into(),
 289        ))?;
 290    }
 291
 292    tracing::error!(
 293        service = "client",
 294        version = %panic.app_version,
 295        os_name = %panic.os_name,
 296        os_version = %panic.os_version.clone().unwrap_or_default(),
 297        installation_id = %panic.installation_id.unwrap_or_default(),
 298        description = %panic.payload,
 299        backtrace = %panic.backtrace.join("\n"),
 300        "panic report");
 301
 302    let backtrace = if panic.backtrace.len() > 25 {
 303        let total = panic.backtrace.len();
 304        format!(
 305            "{}\n   and {} more",
 306            panic
 307                .backtrace
 308                .iter()
 309                .take(20)
 310                .cloned()
 311                .collect::<Vec<_>>()
 312                .join("\n"),
 313            total - 20
 314        )
 315    } else {
 316        panic.backtrace.join("\n")
 317    };
 318    let backtrace_with_summary = panic.payload + "\n" + &backtrace;
 319
 320    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 321        let payload = slack::WebhookBody::new(|w| {
 322            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
 323                .add_section(|s| {
 324                    s.add_field(slack::Text::markdown(format!(
 325                        "*Version:*\n {} ",
 326                        panic.app_version
 327                    )))
 328                    .add_field({
 329                        slack::Text::markdown(format!(
 330                            "*OS:*\n{} {}",
 331                            panic.os_name,
 332                            panic.os_version.unwrap_or_default()
 333                        ))
 334                    })
 335                })
 336                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
 337        });
 338        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 339            log::error!("Failed to serialize payload to JSON: {err}");
 340            Error::Internal(anyhow!(err))
 341        })?;
 342
 343        reqwest::Client::new()
 344            .post(slack_panics_webhook)
 345            .header("Content-Type", "application/json")
 346            .body(payload_json)
 347            .send()
 348            .await
 349            .map_err(|err| {
 350                log::error!("Failed to send payload to Slack: {err}");
 351                Error::Internal(anyhow!(err))
 352            })?;
 353    }
 354
 355    Ok(())
 356}
 357
 358pub async fn post_events(
 359    Extension(app): Extension<Arc<AppState>>,
 360    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 361    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 362    body: Bytes,
 363) -> Result<()> {
 364    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 365        Err(Error::http(
 366            StatusCode::NOT_IMPLEMENTED,
 367            "not supported".into(),
 368        ))?
 369    };
 370
 371    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 372        return Err(Error::http(
 373            StatusCode::INTERNAL_SERVER_ERROR,
 374            "events not enabled".into(),
 375        ))?;
 376    };
 377
 378    let checksum_matched = checksum == expected;
 379
 380    let request_body: telemetry_events::EventRequestBody =
 381        serde_json::from_slice(&body).map_err(|err| {
 382            log::error!("can't parse event json: {err}");
 383            Error::Internal(anyhow!(err))
 384        })?;
 385
 386    let mut to_upload = ToUpload::default();
 387    let Some(last_event) = request_body.events.last() else {
 388        return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
 389    };
 390    let country_code = country_code_header.map(|h| h.to_string());
 391
 392    let first_event_at = chrono::Utc::now()
 393        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 394
 395    for wrapper in &request_body.events {
 396        match &wrapper.event {
 397            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 398                event.clone(),
 399                &wrapper,
 400                &request_body,
 401                first_event_at,
 402                country_code.clone(),
 403                checksum_matched,
 404            )),
 405            // Needed for clients sending old copilot_event types
 406            Event::Copilot(_) => {}
 407            Event::InlineCompletion(event) => {
 408                to_upload
 409                    .inline_completion_events
 410                    .push(InlineCompletionEventRow::from_event(
 411                        event.clone(),
 412                        &wrapper,
 413                        &request_body,
 414                        first_event_at,
 415                        country_code.clone(),
 416                        checksum_matched,
 417                    ))
 418            }
 419            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 420                event.clone(),
 421                &wrapper,
 422                &request_body,
 423                first_event_at,
 424                checksum_matched,
 425            )),
 426            Event::Assistant(event) => {
 427                to_upload
 428                    .assistant_events
 429                    .push(AssistantEventRow::from_event(
 430                        event.clone(),
 431                        &wrapper,
 432                        &request_body,
 433                        first_event_at,
 434                        checksum_matched,
 435                    ))
 436            }
 437            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 438                event.clone(),
 439                &wrapper,
 440                &request_body,
 441                first_event_at,
 442                checksum_matched,
 443            )),
 444            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 445                event.clone(),
 446                &wrapper,
 447                &request_body,
 448                first_event_at,
 449                checksum_matched,
 450            )),
 451            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 452                event.clone(),
 453                &wrapper,
 454                &request_body,
 455                first_event_at,
 456                checksum_matched,
 457            )),
 458            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 459                event.clone(),
 460                &wrapper,
 461                &request_body,
 462                first_event_at,
 463                checksum_matched,
 464            )),
 465            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 466                event.clone(),
 467                &wrapper,
 468                &request_body,
 469                first_event_at,
 470                checksum_matched,
 471            )),
 472            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 473                event.clone(),
 474                &wrapper,
 475                &request_body,
 476                first_event_at,
 477                checksum_matched,
 478            )),
 479            Event::Extension(event) => {
 480                let metadata = app
 481                    .db
 482                    .get_extension_version(&event.extension_id, &event.version)
 483                    .await?;
 484                to_upload
 485                    .extension_events
 486                    .push(ExtensionEventRow::from_event(
 487                        event.clone(),
 488                        &wrapper,
 489                        &request_body,
 490                        metadata,
 491                        first_event_at,
 492                        checksum_matched,
 493                    ))
 494            }
 495            Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
 496                event.clone(),
 497                &wrapper,
 498                &request_body,
 499                first_event_at,
 500                checksum_matched,
 501            )),
 502        }
 503    }
 504
 505    to_upload
 506        .upload(&clickhouse_client)
 507        .await
 508        .map_err(|err| Error::Internal(anyhow!(err)))?;
 509
 510    Ok(())
 511}
 512
 513#[derive(Default)]
 514struct ToUpload {
 515    editor_events: Vec<EditorEventRow>,
 516    inline_completion_events: Vec<InlineCompletionEventRow>,
 517    assistant_events: Vec<AssistantEventRow>,
 518    call_events: Vec<CallEventRow>,
 519    cpu_events: Vec<CpuEventRow>,
 520    memory_events: Vec<MemoryEventRow>,
 521    app_events: Vec<AppEventRow>,
 522    setting_events: Vec<SettingEventRow>,
 523    extension_events: Vec<ExtensionEventRow>,
 524    edit_events: Vec<EditEventRow>,
 525    action_events: Vec<ActionEventRow>,
 526    repl_events: Vec<ReplEventRow>,
 527}
 528
 529impl ToUpload {
 530    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 531        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 532        Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 533            .await
 534            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 535
 536        const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
 537        Self::upload_to_table(
 538            INLINE_COMPLETION_EVENTS_TABLE,
 539            &self.inline_completion_events,
 540            clickhouse_client,
 541        )
 542        .await
 543        .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
 544
 545        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 546        Self::upload_to_table(
 547            ASSISTANT_EVENTS_TABLE,
 548            &self.assistant_events,
 549            clickhouse_client,
 550        )
 551        .await
 552        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 553
 554        const CALL_EVENTS_TABLE: &str = "call_events";
 555        Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 556            .await
 557            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 558
 559        const CPU_EVENTS_TABLE: &str = "cpu_events";
 560        Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 561            .await
 562            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 563
 564        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 565        Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 566            .await
 567            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 568
 569        const APP_EVENTS_TABLE: &str = "app_events";
 570        Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 571            .await
 572            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 573
 574        const SETTING_EVENTS_TABLE: &str = "setting_events";
 575        Self::upload_to_table(
 576            SETTING_EVENTS_TABLE,
 577            &self.setting_events,
 578            clickhouse_client,
 579        )
 580        .await
 581        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 582
 583        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 584        Self::upload_to_table(
 585            EXTENSION_EVENTS_TABLE,
 586            &self.extension_events,
 587            clickhouse_client,
 588        )
 589        .await
 590        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 591
 592        const EDIT_EVENTS_TABLE: &str = "edit_events";
 593        Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 594            .await
 595            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 596
 597        const ACTION_EVENTS_TABLE: &str = "action_events";
 598        Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 599            .await
 600            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 601
 602        const REPL_EVENTS_TABLE: &str = "repl_events";
 603        Self::upload_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
 604            .await
 605            .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
 606
 607        Ok(())
 608    }
 609
 610    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
 611        table: &str,
 612        rows: &[T],
 613        clickhouse_client: &clickhouse::Client,
 614    ) -> anyhow::Result<()> {
 615        if rows.is_empty() {
 616            return Ok(());
 617        }
 618
 619        let mut insert = clickhouse_client.insert(table)?;
 620
 621        for event in rows {
 622            insert.write(event).await?;
 623        }
 624
 625        insert.end().await?;
 626
 627        let event_count = rows.len();
 628        log::info!(
 629            "wrote {event_count} {event_specifier} to '{table}'",
 630            event_specifier = if event_count == 1 { "event" } else { "events" }
 631        );
 632
 633        Ok(())
 634    }
 635}
 636
 637pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 638where
 639    S: Serializer,
 640{
 641    if country_code.len() != 2 {
 642        use serde::ser::Error;
 643        return Err(S::Error::custom(
 644            "country_code must be exactly 2 characters",
 645        ));
 646    }
 647
 648    let country_code = country_code.as_bytes();
 649
 650    serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
 651}
 652
 653#[derive(Serialize, Debug, clickhouse::Row)]
 654pub struct EditorEventRow {
 655    installation_id: String,
 656    metrics_id: String,
 657    operation: String,
 658    app_version: String,
 659    file_extension: String,
 660    os_name: String,
 661    os_version: String,
 662    release_channel: String,
 663    signed_in: bool,
 664    vim_mode: bool,
 665    #[serde(serialize_with = "serialize_country_code")]
 666    country_code: String,
 667    region_code: String,
 668    city: String,
 669    time: i64,
 670    copilot_enabled: bool,
 671    copilot_enabled_for_language: bool,
 672    historical_event: bool,
 673    architecture: String,
 674    is_staff: Option<bool>,
 675    session_id: Option<String>,
 676    major: Option<i32>,
 677    minor: Option<i32>,
 678    patch: Option<i32>,
 679    checksum_matched: bool,
 680}
 681
 682impl EditorEventRow {
 683    fn from_event(
 684        event: EditorEvent,
 685        wrapper: &EventWrapper,
 686        body: &EventRequestBody,
 687        first_event_at: chrono::DateTime<chrono::Utc>,
 688        country_code: Option<String>,
 689        checksum_matched: bool,
 690    ) -> Self {
 691        let semver = body.semver();
 692        let time =
 693            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 694
 695        Self {
 696            app_version: body.app_version.clone(),
 697            major: semver.map(|v| v.major() as i32),
 698            minor: semver.map(|v| v.minor() as i32),
 699            patch: semver.map(|v| v.patch() as i32),
 700            checksum_matched,
 701            release_channel: body.release_channel.clone().unwrap_or_default(),
 702            os_name: body.os_name.clone(),
 703            os_version: body.os_version.clone().unwrap_or_default(),
 704            architecture: body.architecture.clone(),
 705            installation_id: body.installation_id.clone().unwrap_or_default(),
 706            metrics_id: body.metrics_id.clone().unwrap_or_default(),
 707            session_id: body.session_id.clone(),
 708            is_staff: body.is_staff,
 709            time: time.timestamp_millis(),
 710            operation: event.operation,
 711            file_extension: event.file_extension.unwrap_or_default(),
 712            signed_in: wrapper.signed_in,
 713            vim_mode: event.vim_mode,
 714            copilot_enabled: event.copilot_enabled,
 715            copilot_enabled_for_language: event.copilot_enabled_for_language,
 716            country_code: country_code.unwrap_or("XX".to_string()),
 717            region_code: "".to_string(),
 718            city: "".to_string(),
 719            historical_event: false,
 720        }
 721    }
 722}
 723
 724#[derive(Serialize, Debug, clickhouse::Row)]
 725pub struct InlineCompletionEventRow {
 726    installation_id: String,
 727    provider: String,
 728    suggestion_accepted: bool,
 729    app_version: String,
 730    file_extension: String,
 731    os_name: String,
 732    os_version: String,
 733    release_channel: String,
 734    signed_in: bool,
 735    #[serde(serialize_with = "serialize_country_code")]
 736    country_code: String,
 737    region_code: String,
 738    city: String,
 739    time: i64,
 740    is_staff: Option<bool>,
 741    session_id: Option<String>,
 742    major: Option<i32>,
 743    minor: Option<i32>,
 744    patch: Option<i32>,
 745    checksum_matched: bool,
 746}
 747
 748impl InlineCompletionEventRow {
 749    fn from_event(
 750        event: InlineCompletionEvent,
 751        wrapper: &EventWrapper,
 752        body: &EventRequestBody,
 753        first_event_at: chrono::DateTime<chrono::Utc>,
 754        country_code: Option<String>,
 755        checksum_matched: bool,
 756    ) -> Self {
 757        let semver = body.semver();
 758        let time =
 759            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 760
 761        Self {
 762            app_version: body.app_version.clone(),
 763            major: semver.map(|v| v.major() as i32),
 764            minor: semver.map(|v| v.minor() as i32),
 765            patch: semver.map(|v| v.patch() as i32),
 766            checksum_matched,
 767            release_channel: body.release_channel.clone().unwrap_or_default(),
 768            os_name: body.os_name.clone(),
 769            os_version: body.os_version.clone().unwrap_or_default(),
 770            installation_id: body.installation_id.clone().unwrap_or_default(),
 771            session_id: body.session_id.clone(),
 772            is_staff: body.is_staff,
 773            time: time.timestamp_millis(),
 774            file_extension: event.file_extension.unwrap_or_default(),
 775            signed_in: wrapper.signed_in,
 776            country_code: country_code.unwrap_or("XX".to_string()),
 777            region_code: "".to_string(),
 778            city: "".to_string(),
 779            provider: event.provider,
 780            suggestion_accepted: event.suggestion_accepted,
 781        }
 782    }
 783}
 784
 785#[derive(Serialize, Debug, clickhouse::Row)]
 786pub struct CallEventRow {
 787    // AppInfoBase
 788    app_version: String,
 789    major: Option<i32>,
 790    minor: Option<i32>,
 791    patch: Option<i32>,
 792    release_channel: String,
 793    os_name: String,
 794    os_version: String,
 795    checksum_matched: bool,
 796
 797    // ClientEventBase
 798    installation_id: String,
 799    session_id: Option<String>,
 800    is_staff: Option<bool>,
 801    time: i64,
 802
 803    // CallEventRow
 804    operation: String,
 805    room_id: Option<u64>,
 806    channel_id: Option<u64>,
 807}
 808
 809impl CallEventRow {
 810    fn from_event(
 811        event: CallEvent,
 812        wrapper: &EventWrapper,
 813        body: &EventRequestBody,
 814        first_event_at: chrono::DateTime<chrono::Utc>,
 815        checksum_matched: bool,
 816    ) -> Self {
 817        let semver = body.semver();
 818        let time =
 819            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 820
 821        Self {
 822            app_version: body.app_version.clone(),
 823            major: semver.map(|v| v.major() as i32),
 824            minor: semver.map(|v| v.minor() as i32),
 825            patch: semver.map(|v| v.patch() as i32),
 826            checksum_matched,
 827            release_channel: body.release_channel.clone().unwrap_or_default(),
 828            os_name: body.os_name.clone(),
 829            os_version: body.os_version.clone().unwrap_or_default(),
 830            installation_id: body.installation_id.clone().unwrap_or_default(),
 831            session_id: body.session_id.clone(),
 832            is_staff: body.is_staff,
 833            time: time.timestamp_millis(),
 834            operation: event.operation,
 835            room_id: event.room_id,
 836            channel_id: event.channel_id,
 837        }
 838    }
 839}
 840
 841#[derive(Serialize, Debug, clickhouse::Row)]
 842pub struct AssistantEventRow {
 843    // AppInfoBase
 844    app_version: String,
 845    major: Option<i32>,
 846    minor: Option<i32>,
 847    patch: Option<i32>,
 848    checksum_matched: bool,
 849    release_channel: String,
 850    os_name: String,
 851    os_version: String,
 852
 853    // ClientEventBase
 854    installation_id: Option<String>,
 855    session_id: Option<String>,
 856    is_staff: Option<bool>,
 857    time: i64,
 858
 859    // AssistantEventRow
 860    conversation_id: String,
 861    kind: String,
 862    model: String,
 863    response_latency_in_ms: Option<i64>,
 864    error_message: Option<String>,
 865}
 866
 867impl AssistantEventRow {
 868    fn from_event(
 869        event: AssistantEvent,
 870        wrapper: &EventWrapper,
 871        body: &EventRequestBody,
 872        first_event_at: chrono::DateTime<chrono::Utc>,
 873        checksum_matched: bool,
 874    ) -> Self {
 875        let semver = body.semver();
 876        let time =
 877            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 878
 879        Self {
 880            app_version: body.app_version.clone(),
 881            major: semver.map(|v| v.major() as i32),
 882            minor: semver.map(|v| v.minor() as i32),
 883            patch: semver.map(|v| v.patch() as i32),
 884            checksum_matched,
 885            release_channel: body.release_channel.clone().unwrap_or_default(),
 886            os_name: body.os_name.clone(),
 887            os_version: body.os_version.clone().unwrap_or_default(),
 888            installation_id: body.installation_id.clone(),
 889            session_id: body.session_id.clone(),
 890            is_staff: body.is_staff,
 891            time: time.timestamp_millis(),
 892            conversation_id: event.conversation_id.unwrap_or_default(),
 893            kind: event.kind.to_string(),
 894            model: event.model,
 895            response_latency_in_ms: event
 896                .response_latency
 897                .map(|latency| latency.as_millis() as i64),
 898            error_message: event.error_message,
 899        }
 900    }
 901}
 902
 903#[derive(Debug, clickhouse::Row, Serialize)]
 904pub struct CpuEventRow {
 905    installation_id: Option<String>,
 906    is_staff: Option<bool>,
 907    usage_as_percentage: f32,
 908    core_count: u32,
 909    app_version: String,
 910    release_channel: String,
 911    os_name: String,
 912    os_version: String,
 913    time: i64,
 914    session_id: Option<String>,
 915    // pub normalized_cpu_usage: f64, MATERIALIZED
 916    major: Option<i32>,
 917    minor: Option<i32>,
 918    patch: Option<i32>,
 919    checksum_matched: bool,
 920}
 921
 922impl CpuEventRow {
 923    fn from_event(
 924        event: CpuEvent,
 925        wrapper: &EventWrapper,
 926        body: &EventRequestBody,
 927        first_event_at: chrono::DateTime<chrono::Utc>,
 928        checksum_matched: bool,
 929    ) -> Self {
 930        let semver = body.semver();
 931        let time =
 932            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 933
 934        Self {
 935            app_version: body.app_version.clone(),
 936            major: semver.map(|v| v.major() as i32),
 937            minor: semver.map(|v| v.minor() as i32),
 938            patch: semver.map(|v| v.patch() as i32),
 939            checksum_matched,
 940            release_channel: body.release_channel.clone().unwrap_or_default(),
 941            os_name: body.os_name.clone(),
 942            os_version: body.os_version.clone().unwrap_or_default(),
 943            installation_id: body.installation_id.clone(),
 944            session_id: body.session_id.clone(),
 945            is_staff: body.is_staff,
 946            time: time.timestamp_millis(),
 947            usage_as_percentage: event.usage_as_percentage,
 948            core_count: event.core_count,
 949        }
 950    }
 951}
 952
 953#[derive(Serialize, Debug, clickhouse::Row)]
 954pub struct MemoryEventRow {
 955    // AppInfoBase
 956    app_version: String,
 957    major: Option<i32>,
 958    minor: Option<i32>,
 959    patch: Option<i32>,
 960    checksum_matched: bool,
 961    release_channel: String,
 962    os_name: String,
 963    os_version: String,
 964
 965    // ClientEventBase
 966    installation_id: Option<String>,
 967    session_id: Option<String>,
 968    is_staff: Option<bool>,
 969    time: i64,
 970
 971    // MemoryEventRow
 972    memory_in_bytes: u64,
 973    virtual_memory_in_bytes: u64,
 974}
 975
 976impl MemoryEventRow {
 977    fn from_event(
 978        event: MemoryEvent,
 979        wrapper: &EventWrapper,
 980        body: &EventRequestBody,
 981        first_event_at: chrono::DateTime<chrono::Utc>,
 982        checksum_matched: bool,
 983    ) -> Self {
 984        let semver = body.semver();
 985        let time =
 986            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 987
 988        Self {
 989            app_version: body.app_version.clone(),
 990            major: semver.map(|v| v.major() as i32),
 991            minor: semver.map(|v| v.minor() as i32),
 992            patch: semver.map(|v| v.patch() as i32),
 993            checksum_matched,
 994            release_channel: body.release_channel.clone().unwrap_or_default(),
 995            os_name: body.os_name.clone(),
 996            os_version: body.os_version.clone().unwrap_or_default(),
 997            installation_id: body.installation_id.clone(),
 998            session_id: body.session_id.clone(),
 999            is_staff: body.is_staff,
1000            time: time.timestamp_millis(),
1001            memory_in_bytes: event.memory_in_bytes,
1002            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
1003        }
1004    }
1005}
1006
1007#[derive(Serialize, Debug, clickhouse::Row)]
1008pub struct AppEventRow {
1009    // AppInfoBase
1010    app_version: String,
1011    major: Option<i32>,
1012    minor: Option<i32>,
1013    patch: Option<i32>,
1014    checksum_matched: bool,
1015    release_channel: String,
1016    os_name: String,
1017    os_version: String,
1018
1019    // ClientEventBase
1020    installation_id: Option<String>,
1021    session_id: Option<String>,
1022    is_staff: Option<bool>,
1023    time: i64,
1024
1025    // AppEventRow
1026    operation: String,
1027}
1028
1029impl AppEventRow {
1030    fn from_event(
1031        event: AppEvent,
1032        wrapper: &EventWrapper,
1033        body: &EventRequestBody,
1034        first_event_at: chrono::DateTime<chrono::Utc>,
1035        checksum_matched: bool,
1036    ) -> Self {
1037        let semver = body.semver();
1038        let time =
1039            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1040
1041        Self {
1042            app_version: body.app_version.clone(),
1043            major: semver.map(|v| v.major() as i32),
1044            minor: semver.map(|v| v.minor() as i32),
1045            patch: semver.map(|v| v.patch() as i32),
1046            checksum_matched,
1047            release_channel: body.release_channel.clone().unwrap_or_default(),
1048            os_name: body.os_name.clone(),
1049            os_version: body.os_version.clone().unwrap_or_default(),
1050            installation_id: body.installation_id.clone(),
1051            session_id: body.session_id.clone(),
1052            is_staff: body.is_staff,
1053            time: time.timestamp_millis(),
1054            operation: event.operation,
1055        }
1056    }
1057}
1058
1059#[derive(Serialize, Debug, clickhouse::Row)]
1060pub struct SettingEventRow {
1061    // AppInfoBase
1062    app_version: String,
1063    major: Option<i32>,
1064    minor: Option<i32>,
1065    patch: Option<i32>,
1066    checksum_matched: bool,
1067    release_channel: String,
1068    os_name: String,
1069    os_version: String,
1070
1071    // ClientEventBase
1072    installation_id: Option<String>,
1073    session_id: Option<String>,
1074    is_staff: Option<bool>,
1075    time: i64,
1076    // SettingEventRow
1077    setting: String,
1078    value: String,
1079}
1080
1081impl SettingEventRow {
1082    fn from_event(
1083        event: SettingEvent,
1084        wrapper: &EventWrapper,
1085        body: &EventRequestBody,
1086        first_event_at: chrono::DateTime<chrono::Utc>,
1087        checksum_matched: bool,
1088    ) -> Self {
1089        let semver = body.semver();
1090        let time =
1091            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1092
1093        Self {
1094            app_version: body.app_version.clone(),
1095            major: semver.map(|v| v.major() as i32),
1096            minor: semver.map(|v| v.minor() as i32),
1097            checksum_matched,
1098            patch: semver.map(|v| v.patch() as i32),
1099            release_channel: body.release_channel.clone().unwrap_or_default(),
1100            os_name: body.os_name.clone(),
1101            os_version: body.os_version.clone().unwrap_or_default(),
1102            installation_id: body.installation_id.clone(),
1103            session_id: body.session_id.clone(),
1104            is_staff: body.is_staff,
1105            time: time.timestamp_millis(),
1106            setting: event.setting,
1107            value: event.value,
1108        }
1109    }
1110}
1111
1112#[derive(Serialize, Debug, clickhouse::Row)]
1113pub struct ExtensionEventRow {
1114    // AppInfoBase
1115    app_version: String,
1116    major: Option<i32>,
1117    minor: Option<i32>,
1118    patch: Option<i32>,
1119    checksum_matched: bool,
1120    release_channel: String,
1121    os_name: String,
1122    os_version: String,
1123
1124    // ClientEventBase
1125    installation_id: Option<String>,
1126    session_id: Option<String>,
1127    is_staff: Option<bool>,
1128    time: i64,
1129
1130    // ExtensionEventRow
1131    extension_id: Arc<str>,
1132    extension_version: Arc<str>,
1133    dev: bool,
1134    schema_version: Option<i32>,
1135    wasm_api_version: Option<String>,
1136}
1137
1138impl ExtensionEventRow {
1139    fn from_event(
1140        event: ExtensionEvent,
1141        wrapper: &EventWrapper,
1142        body: &EventRequestBody,
1143        extension_metadata: Option<ExtensionMetadata>,
1144        first_event_at: chrono::DateTime<chrono::Utc>,
1145        checksum_matched: bool,
1146    ) -> Self {
1147        let semver = body.semver();
1148        let time =
1149            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1150
1151        Self {
1152            app_version: body.app_version.clone(),
1153            major: semver.map(|v| v.major() as i32),
1154            minor: semver.map(|v| v.minor() as i32),
1155            patch: semver.map(|v| v.patch() as i32),
1156            checksum_matched,
1157            release_channel: body.release_channel.clone().unwrap_or_default(),
1158            os_name: body.os_name.clone(),
1159            os_version: body.os_version.clone().unwrap_or_default(),
1160            installation_id: body.installation_id.clone(),
1161            session_id: body.session_id.clone(),
1162            is_staff: body.is_staff,
1163            time: time.timestamp_millis(),
1164            extension_id: event.extension_id,
1165            extension_version: event.version,
1166            dev: extension_metadata.is_none(),
1167            schema_version: extension_metadata
1168                .as_ref()
1169                .and_then(|metadata| metadata.manifest.schema_version),
1170            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1171                metadata
1172                    .manifest
1173                    .wasm_api_version
1174                    .as_ref()
1175                    .map(|version| version.to_string())
1176            }),
1177        }
1178    }
1179}
1180
1181#[derive(Serialize, Debug, clickhouse::Row)]
1182pub struct ReplEventRow {
1183    // AppInfoBase
1184    app_version: String,
1185    major: Option<i32>,
1186    minor: Option<i32>,
1187    patch: Option<i32>,
1188    checksum_matched: bool,
1189    release_channel: String,
1190    os_name: String,
1191    os_version: String,
1192
1193    // ClientEventBase
1194    installation_id: Option<String>,
1195    session_id: Option<String>,
1196    is_staff: Option<bool>,
1197    time: i64,
1198
1199    // ReplEventRow
1200    kernel_language: String,
1201    kernel_status: String,
1202    repl_session_id: String,
1203}
1204
1205impl ReplEventRow {
1206    fn from_event(
1207        event: ReplEvent,
1208        wrapper: &EventWrapper,
1209        body: &EventRequestBody,
1210        first_event_at: chrono::DateTime<chrono::Utc>,
1211        checksum_matched: bool,
1212    ) -> Self {
1213        let semver = body.semver();
1214        let time =
1215            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1216
1217        Self {
1218            app_version: body.app_version.clone(),
1219            major: semver.map(|v| v.major() as i32),
1220            minor: semver.map(|v| v.minor() as i32),
1221            patch: semver.map(|v| v.patch() as i32),
1222            checksum_matched,
1223            release_channel: body.release_channel.clone().unwrap_or_default(),
1224            os_name: body.os_name.clone(),
1225            os_version: body.os_version.clone().unwrap_or_default(),
1226            installation_id: body.installation_id.clone(),
1227            session_id: body.session_id.clone(),
1228            is_staff: body.is_staff,
1229            time: time.timestamp_millis(),
1230            kernel_language: event.kernel_language,
1231            kernel_status: event.kernel_status,
1232            repl_session_id: event.repl_session_id,
1233        }
1234    }
1235}
1236
1237#[derive(Serialize, Debug, clickhouse::Row)]
1238pub struct EditEventRow {
1239    // AppInfoBase
1240    app_version: String,
1241    major: Option<i32>,
1242    minor: Option<i32>,
1243    patch: Option<i32>,
1244    checksum_matched: bool,
1245    release_channel: String,
1246    os_name: String,
1247    os_version: String,
1248
1249    // ClientEventBase
1250    installation_id: Option<String>,
1251    // Note: This column name has a typo in the ClickHouse table.
1252    #[serde(rename = "sesssion_id")]
1253    session_id: Option<String>,
1254    is_staff: Option<bool>,
1255    time: i64,
1256
1257    // EditEventRow
1258    period_start: i64,
1259    period_end: i64,
1260    environment: String,
1261}
1262
1263impl EditEventRow {
1264    fn from_event(
1265        event: EditEvent,
1266        wrapper: &EventWrapper,
1267        body: &EventRequestBody,
1268        first_event_at: chrono::DateTime<chrono::Utc>,
1269        checksum_matched: bool,
1270    ) -> Self {
1271        let semver = body.semver();
1272        let time =
1273            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1274
1275        let period_start = time - chrono::Duration::milliseconds(event.duration);
1276        let period_end = time;
1277
1278        Self {
1279            app_version: body.app_version.clone(),
1280            major: semver.map(|v| v.major() as i32),
1281            minor: semver.map(|v| v.minor() as i32),
1282            patch: semver.map(|v| v.patch() as i32),
1283            checksum_matched,
1284            release_channel: body.release_channel.clone().unwrap_or_default(),
1285            os_name: body.os_name.clone(),
1286            os_version: body.os_version.clone().unwrap_or_default(),
1287            installation_id: body.installation_id.clone(),
1288            session_id: body.session_id.clone(),
1289            is_staff: body.is_staff,
1290            time: time.timestamp_millis(),
1291            period_start: period_start.timestamp_millis(),
1292            period_end: period_end.timestamp_millis(),
1293            environment: event.environment,
1294        }
1295    }
1296}
1297
1298#[derive(Serialize, Debug, clickhouse::Row)]
1299pub struct ActionEventRow {
1300    // AppInfoBase
1301    app_version: String,
1302    major: Option<i32>,
1303    minor: Option<i32>,
1304    patch: Option<i32>,
1305    checksum_matched: bool,
1306    release_channel: String,
1307    os_name: String,
1308    os_version: String,
1309
1310    // ClientEventBase
1311    installation_id: Option<String>,
1312    // Note: This column name has a typo in the ClickHouse table.
1313    #[serde(rename = "sesssion_id")]
1314    session_id: Option<String>,
1315    is_staff: Option<bool>,
1316    time: i64,
1317    // ActionEventRow
1318    source: String,
1319    action: String,
1320}
1321
1322impl ActionEventRow {
1323    fn from_event(
1324        event: ActionEvent,
1325        wrapper: &EventWrapper,
1326        body: &EventRequestBody,
1327        first_event_at: chrono::DateTime<chrono::Utc>,
1328        checksum_matched: bool,
1329    ) -> Self {
1330        let semver = body.semver();
1331        let time =
1332            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1333
1334        Self {
1335            app_version: body.app_version.clone(),
1336            major: semver.map(|v| v.major() as i32),
1337            minor: semver.map(|v| v.minor() as i32),
1338            patch: semver.map(|v| v.patch() as i32),
1339            checksum_matched,
1340            release_channel: body.release_channel.clone().unwrap_or_default(),
1341            os_name: body.os_name.clone(),
1342            os_version: body.os_version.clone().unwrap_or_default(),
1343            installation_id: body.installation_id.clone(),
1344            session_id: body.session_id.clone(),
1345            is_staff: body.is_staff,
1346            time: time.timestamp_millis(),
1347            source: event.source,
1348            action: event.action,
1349        }
1350    }
1351}
1352
1353pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1354    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
1355        return None;
1356    };
1357
1358    let mut summer = Sha256::new();
1359    summer.update(checksum_seed);
1360    summer.update(&json);
1361    summer.update(checksum_seed);
1362    Some(summer.finalize().into_iter().collect())
1363}