events.rs

   1use super::ips_file::IpsFile;
   2use crate::{api::slack, AppState, Error, Result};
   3use anyhow::{anyhow, Context};
   4use aws_sdk_s3::primitives::ByteStream;
   5use axum::{
   6    body::Bytes,
   7    headers::Header,
   8    http::{HeaderMap, HeaderName, StatusCode},
   9    routing::post,
  10    Extension, Router, TypedHeader,
  11};
  12use rpc::ExtensionMetadata;
  13use semantic_version::SemanticVersion;
  14use serde::{Serialize, Serializer};
  15use sha2::{Digest, Sha256};
  16use std::sync::{Arc, OnceLock};
  17use telemetry_events::{
  18    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
  19    EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent, SettingEvent,
  20};
  21use uuid::Uuid;
  22
  23static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  24
  25pub fn router() -> Router {
  26    Router::new()
  27        .route("/telemetry/events", post(post_events))
  28        .route("/telemetry/crashes", post(post_crash))
  29        .route("/telemetry/panics", post(post_panic))
  30        .route("/telemetry/hangs", post(post_hang))
  31}
  32
  33pub struct ZedChecksumHeader(Vec<u8>);
  34
  35impl Header for ZedChecksumHeader {
  36    fn name() -> &'static HeaderName {
  37        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  38        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  39    }
  40
  41    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  42    where
  43        Self: Sized,
  44        I: Iterator<Item = &'i axum::http::HeaderValue>,
  45    {
  46        let checksum = values
  47            .next()
  48            .ok_or_else(axum::headers::Error::invalid)?
  49            .to_str()
  50            .map_err(|_| axum::headers::Error::invalid())?;
  51
  52        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  53        Ok(Self(bytes))
  54    }
  55
  56    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  57        unimplemented!()
  58    }
  59}
  60
  61pub struct CloudflareIpCountryHeader(String);
  62
  63impl Header for CloudflareIpCountryHeader {
  64    fn name() -> &'static HeaderName {
  65        static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
  66        CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
  67    }
  68
  69    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  70    where
  71        Self: Sized,
  72        I: Iterator<Item = &'i axum::http::HeaderValue>,
  73    {
  74        let country_code = values
  75            .next()
  76            .ok_or_else(axum::headers::Error::invalid)?
  77            .to_str()
  78            .map_err(|_| axum::headers::Error::invalid())?;
  79
  80        Ok(Self(country_code.to_string()))
  81    }
  82
  83    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  84        unimplemented!()
  85    }
  86}
  87
  88pub async fn post_crash(
  89    Extension(app): Extension<Arc<AppState>>,
  90    headers: HeaderMap,
  91    body: Bytes,
  92) -> Result<()> {
  93    let report = IpsFile::parse(&body)?;
  94    let version_threshold = SemanticVersion::new(0, 123, 0);
  95
  96    let bundle_id = &report.header.bundle_id;
  97    let app_version = &report.app_version();
  98
  99    if bundle_id == "dev.zed.Zed-Dev" {
 100        log::error!("Crash uploads from {} are ignored.", bundle_id);
 101        return Ok(());
 102    }
 103
 104    if app_version.is_none() || app_version.unwrap() < version_threshold {
 105        log::error!(
 106            "Crash uploads from {} are ignored.",
 107            report.header.app_version
 108        );
 109        return Ok(());
 110    }
 111    let app_version = app_version.unwrap();
 112
 113    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 114        let response = blob_store_client
 115            .head_object()
 116            .bucket(CRASH_REPORTS_BUCKET)
 117            .key(report.header.incident_id.clone() + ".ips")
 118            .send()
 119            .await;
 120
 121        if response.is_ok() {
 122            log::info!("We've already uploaded this crash");
 123            return Ok(());
 124        }
 125
 126        blob_store_client
 127            .put_object()
 128            .bucket(CRASH_REPORTS_BUCKET)
 129            .key(report.header.incident_id.clone() + ".ips")
 130            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 131            .body(ByteStream::from(body.to_vec()))
 132            .send()
 133            .await
 134            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 135            .ok();
 136    }
 137
 138    let recent_panic_on: Option<i64> = headers
 139        .get("x-zed-panicked-on")
 140        .and_then(|h| h.to_str().ok())
 141        .and_then(|s| s.parse().ok());
 142
 143    let installation_id = headers
 144        .get("x-zed-installation-id")
 145        .and_then(|h| h.to_str().ok())
 146        .map(|s| s.to_string())
 147        .unwrap_or_default();
 148
 149    let mut recent_panic = None;
 150
 151    if let Some(recent_panic_on) = recent_panic_on {
 152        let crashed_at = match report.timestamp() {
 153            Ok(t) => Some(t),
 154            Err(e) => {
 155                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 156                None
 157            }
 158        };
 159        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 160            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 161        }
 162    }
 163
 164    let description = report.description(recent_panic);
 165    let summary = report.backtrace_summary();
 166
 167    tracing::error!(
 168        service = "client",
 169        version = %report.header.app_version,
 170        os_version = %report.header.os_version,
 171        bundle_id = %report.header.bundle_id,
 172        incident_id = %report.header.incident_id,
 173        installation_id = %installation_id,
 174        description = %description,
 175        backtrace = %summary,
 176        "crash report");
 177
 178    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 179        let payload = slack::WebhookBody::new(|w| {
 180            w.add_section(|s| s.text(slack::Text::markdown(description)))
 181                .add_section(|s| {
 182                    s.add_field(slack::Text::markdown(format!(
 183                        "*Version:*\n{} ({})",
 184                        bundle_id, app_version
 185                    )))
 186                    .add_field({
 187                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 188                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 189                            hostname.strip_prefix("http://").unwrap_or_default()
 190                        });
 191
 192                        slack::Text::markdown(format!(
 193                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 194                            CRASH_REPORTS_BUCKET,
 195                            hostname,
 196                            report.header.incident_id,
 197                            report
 198                                .header
 199                                .incident_id
 200                                .chars()
 201                                .take(8)
 202                                .collect::<String>(),
 203                        ))
 204                    })
 205                })
 206                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 207        });
 208        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 209            log::error!("Failed to serialize payload to JSON: {err}");
 210            Error::Internal(anyhow!(err))
 211        })?;
 212
 213        reqwest::Client::new()
 214            .post(slack_panics_webhook)
 215            .header("Content-Type", "application/json")
 216            .body(payload_json)
 217            .send()
 218            .await
 219            .map_err(|err| {
 220                log::error!("Failed to send payload to Slack: {err}");
 221                Error::Internal(anyhow!(err))
 222            })?;
 223    }
 224
 225    Ok(())
 226}
 227
 228pub async fn post_hang(
 229    Extension(app): Extension<Arc<AppState>>,
 230    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 231    body: Bytes,
 232) -> Result<()> {
 233    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 234        return Err(Error::Http(
 235            StatusCode::INTERNAL_SERVER_ERROR,
 236            "events not enabled".into(),
 237        ))?;
 238    };
 239
 240    if checksum != expected {
 241        return Err(Error::Http(
 242            StatusCode::BAD_REQUEST,
 243            "invalid checksum".into(),
 244        ))?;
 245    }
 246
 247    let incident_id = Uuid::new_v4().to_string();
 248
 249    // dump JSON into S3 so we can get frame offsets if we need to.
 250    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 251        blob_store_client
 252            .put_object()
 253            .bucket(CRASH_REPORTS_BUCKET)
 254            .key(incident_id.clone() + ".hang.json")
 255            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 256            .body(ByteStream::from(body.to_vec()))
 257            .send()
 258            .await
 259            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 260            .ok();
 261    }
 262
 263    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
 264        log::error!("can't parse report json: {err}");
 265        Error::Internal(anyhow!(err))
 266    })?;
 267
 268    let mut backtrace = "Possible hang detected on main thread:".to_string();
 269    let unknown = "<unknown>".to_string();
 270    for frame in report.backtrace.iter() {
 271        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
 272    }
 273
 274    tracing::error!(
 275        service = "client",
 276        version = %report.app_version.unwrap_or_default().to_string(),
 277        os_name = %report.os_name,
 278        os_version = report.os_version.unwrap_or_default().to_string(),
 279        incident_id = %incident_id,
 280        installation_id = %report.installation_id.unwrap_or_default(),
 281        backtrace = %backtrace,
 282        "hang report");
 283
 284    Ok(())
 285}
 286
 287pub async fn post_panic(
 288    Extension(app): Extension<Arc<AppState>>,
 289    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 290    body: Bytes,
 291) -> Result<()> {
 292    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 293        return Err(Error::Http(
 294            StatusCode::INTERNAL_SERVER_ERROR,
 295            "events not enabled".into(),
 296        ))?;
 297    };
 298
 299    if checksum != expected {
 300        return Err(Error::Http(
 301            StatusCode::BAD_REQUEST,
 302            "invalid checksum".into(),
 303        ))?;
 304    }
 305
 306    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
 307        .map_err(|_| Error::Http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
 308    let panic = report.panic;
 309
 310    tracing::error!(
 311        service = "client",
 312        version = %panic.app_version,
 313        os_name = %panic.os_name,
 314        os_version = %panic.os_version.clone().unwrap_or_default(),
 315        installation_id = %panic.installation_id.unwrap_or_default(),
 316        description = %panic.payload,
 317        backtrace = %panic.backtrace.join("\n"),
 318        "panic report");
 319
 320    let backtrace = if panic.backtrace.len() > 25 {
 321        let total = panic.backtrace.len();
 322        format!(
 323            "{}\n   and {} more",
 324            panic
 325                .backtrace
 326                .iter()
 327                .take(20)
 328                .cloned()
 329                .collect::<Vec<_>>()
 330                .join("\n"),
 331            total - 20
 332        )
 333    } else {
 334        panic.backtrace.join("\n")
 335    };
 336    let backtrace_with_summary = panic.payload + "\n" + &backtrace;
 337
 338    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 339        let payload = slack::WebhookBody::new(|w| {
 340            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
 341                .add_section(|s| {
 342                    s.add_field(slack::Text::markdown(format!(
 343                        "*Version:*\n {} ",
 344                        panic.app_version
 345                    )))
 346                    .add_field({
 347                        slack::Text::markdown(format!(
 348                            "*OS:*\n{} {}",
 349                            panic.os_name,
 350                            panic.os_version.unwrap_or_default()
 351                        ))
 352                    })
 353                })
 354                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
 355        });
 356        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 357            log::error!("Failed to serialize payload to JSON: {err}");
 358            Error::Internal(anyhow!(err))
 359        })?;
 360
 361        reqwest::Client::new()
 362            .post(slack_panics_webhook)
 363            .header("Content-Type", "application/json")
 364            .body(payload_json)
 365            .send()
 366            .await
 367            .map_err(|err| {
 368                log::error!("Failed to send payload to Slack: {err}");
 369                Error::Internal(anyhow!(err))
 370            })?;
 371    }
 372
 373    Ok(())
 374}
 375
 376pub async fn post_events(
 377    Extension(app): Extension<Arc<AppState>>,
 378    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 379    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 380    body: Bytes,
 381) -> Result<()> {
 382    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 383        Err(Error::Http(
 384            StatusCode::NOT_IMPLEMENTED,
 385            "not supported".into(),
 386        ))?
 387    };
 388
 389    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 390        return Err(Error::Http(
 391            StatusCode::INTERNAL_SERVER_ERROR,
 392            "events not enabled".into(),
 393        ))?;
 394    };
 395
 396    if checksum != expected {
 397        return Err(Error::Http(
 398            StatusCode::BAD_REQUEST,
 399            "invalid checksum".into(),
 400        ))?;
 401    }
 402
 403    let request_body: telemetry_events::EventRequestBody =
 404        serde_json::from_slice(&body).map_err(|err| {
 405            log::error!("can't parse event json: {err}");
 406            Error::Internal(anyhow!(err))
 407        })?;
 408
 409    let mut to_upload = ToUpload::default();
 410    let Some(last_event) = request_body.events.last() else {
 411        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
 412    };
 413    let country_code = country_code_header.map(|h| h.0 .0);
 414
 415    let first_event_at = chrono::Utc::now()
 416        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 417
 418    for wrapper in &request_body.events {
 419        match &wrapper.event {
 420            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 421                event.clone(),
 422                &wrapper,
 423                &request_body,
 424                first_event_at,
 425                country_code.clone(),
 426            )),
 427            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
 428                event.clone(),
 429                &wrapper,
 430                &request_body,
 431                first_event_at,
 432                country_code.clone(),
 433            )),
 434            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 435                event.clone(),
 436                &wrapper,
 437                &request_body,
 438                first_event_at,
 439            )),
 440            Event::Assistant(event) => {
 441                to_upload
 442                    .assistant_events
 443                    .push(AssistantEventRow::from_event(
 444                        event.clone(),
 445                        &wrapper,
 446                        &request_body,
 447                        first_event_at,
 448                    ))
 449            }
 450            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 451                event.clone(),
 452                &wrapper,
 453                &request_body,
 454                first_event_at,
 455            )),
 456            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 457                event.clone(),
 458                &wrapper,
 459                &request_body,
 460                first_event_at,
 461            )),
 462            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 463                event.clone(),
 464                &wrapper,
 465                &request_body,
 466                first_event_at,
 467            )),
 468            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 469                event.clone(),
 470                &wrapper,
 471                &request_body,
 472                first_event_at,
 473            )),
 474            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 475                event.clone(),
 476                &wrapper,
 477                &request_body,
 478                first_event_at,
 479            )),
 480            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 481                event.clone(),
 482                &wrapper,
 483                &request_body,
 484                first_event_at,
 485            )),
 486            Event::Extension(event) => {
 487                let metadata = app
 488                    .db
 489                    .get_extension_version(&event.extension_id, &event.version)
 490                    .await?;
 491                to_upload
 492                    .extension_events
 493                    .push(ExtensionEventRow::from_event(
 494                        event.clone(),
 495                        &wrapper,
 496                        &request_body,
 497                        metadata,
 498                        first_event_at,
 499                    ))
 500            }
 501        }
 502    }
 503
 504    to_upload
 505        .upload(&clickhouse_client)
 506        .await
 507        .map_err(|err| Error::Internal(anyhow!(err)))?;
 508
 509    Ok(())
 510}
 511
 512#[derive(Default)]
 513struct ToUpload {
 514    editor_events: Vec<EditorEventRow>,
 515    copilot_events: Vec<CopilotEventRow>,
 516    assistant_events: Vec<AssistantEventRow>,
 517    call_events: Vec<CallEventRow>,
 518    cpu_events: Vec<CpuEventRow>,
 519    memory_events: Vec<MemoryEventRow>,
 520    app_events: Vec<AppEventRow>,
 521    setting_events: Vec<SettingEventRow>,
 522    extension_events: Vec<ExtensionEventRow>,
 523    edit_events: Vec<EditEventRow>,
 524    action_events: Vec<ActionEventRow>,
 525}
 526
 527impl ToUpload {
 528    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 529        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 530        Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 531            .await
 532            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 533
 534        const COPILOT_EVENTS_TABLE: &str = "copilot_events";
 535        Self::upload_to_table(
 536            COPILOT_EVENTS_TABLE,
 537            &self.copilot_events,
 538            clickhouse_client,
 539        )
 540        .await
 541        .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
 542
 543        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 544        Self::upload_to_table(
 545            ASSISTANT_EVENTS_TABLE,
 546            &self.assistant_events,
 547            clickhouse_client,
 548        )
 549        .await
 550        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 551
 552        const CALL_EVENTS_TABLE: &str = "call_events";
 553        Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 554            .await
 555            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 556
 557        const CPU_EVENTS_TABLE: &str = "cpu_events";
 558        Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 559            .await
 560            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 561
 562        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 563        Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 564            .await
 565            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 566
 567        const APP_EVENTS_TABLE: &str = "app_events";
 568        Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 569            .await
 570            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 571
 572        const SETTING_EVENTS_TABLE: &str = "setting_events";
 573        Self::upload_to_table(
 574            SETTING_EVENTS_TABLE,
 575            &self.setting_events,
 576            clickhouse_client,
 577        )
 578        .await
 579        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 580
 581        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 582        Self::upload_to_table(
 583            EXTENSION_EVENTS_TABLE,
 584            &self.extension_events,
 585            clickhouse_client,
 586        )
 587        .await
 588        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 589
 590        const EDIT_EVENTS_TABLE: &str = "edit_events";
 591        Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 592            .await
 593            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 594
 595        const ACTION_EVENTS_TABLE: &str = "action_events";
 596        Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 597            .await
 598            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 599
 600        Ok(())
 601    }
 602
 603    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
 604        table: &str,
 605        rows: &[T],
 606        clickhouse_client: &clickhouse::Client,
 607    ) -> anyhow::Result<()> {
 608        if !rows.is_empty() {
 609            let mut insert = clickhouse_client.insert(table)?;
 610
 611            for event in rows {
 612                insert.write(event).await?;
 613            }
 614
 615            insert.end().await?;
 616
 617            let event_count = rows.len();
 618            log::info!(
 619                "wrote {event_count} {event_specifier} to '{table}'",
 620                event_specifier = if event_count == 1 { "event" } else { "events" }
 621            );
 622        }
 623
 624        Ok(())
 625    }
 626}
 627
 628pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 629where
 630    S: Serializer,
 631{
 632    if country_code.len() != 2 {
 633        use serde::ser::Error;
 634        return Err(S::Error::custom(
 635            "country_code must be exactly 2 characters",
 636        ));
 637    }
 638
 639    let country_code = country_code.as_bytes();
 640
 641    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
 642}
 643
 644#[derive(Serialize, Debug, clickhouse::Row)]
 645pub struct EditorEventRow {
 646    pub installation_id: String,
 647    pub operation: String,
 648    pub app_version: String,
 649    pub file_extension: String,
 650    pub os_name: String,
 651    pub os_version: String,
 652    pub release_channel: String,
 653    pub signed_in: bool,
 654    pub vim_mode: bool,
 655    #[serde(serialize_with = "serialize_country_code")]
 656    pub country_code: String,
 657    pub region_code: String,
 658    pub city: String,
 659    pub time: i64,
 660    pub copilot_enabled: bool,
 661    pub copilot_enabled_for_language: bool,
 662    pub historical_event: bool,
 663    pub architecture: String,
 664    pub is_staff: Option<bool>,
 665    pub session_id: Option<String>,
 666    pub major: Option<i32>,
 667    pub minor: Option<i32>,
 668    pub patch: Option<i32>,
 669}
 670
 671impl EditorEventRow {
 672    fn from_event(
 673        event: EditorEvent,
 674        wrapper: &EventWrapper,
 675        body: &EventRequestBody,
 676        first_event_at: chrono::DateTime<chrono::Utc>,
 677        country_code: Option<String>,
 678    ) -> Self {
 679        let semver = body.semver();
 680        let time =
 681            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 682
 683        Self {
 684            app_version: body.app_version.clone(),
 685            major: semver.map(|v| v.major() as i32),
 686            minor: semver.map(|v| v.minor() as i32),
 687            patch: semver.map(|v| v.patch() as i32),
 688            release_channel: body.release_channel.clone().unwrap_or_default(),
 689            os_name: body.os_name.clone(),
 690            os_version: body.os_version.clone().unwrap_or_default(),
 691            architecture: body.architecture.clone(),
 692            installation_id: body.installation_id.clone().unwrap_or_default(),
 693            session_id: body.session_id.clone(),
 694            is_staff: body.is_staff,
 695            time: time.timestamp_millis(),
 696            operation: event.operation,
 697            file_extension: event.file_extension.unwrap_or_default(),
 698            signed_in: wrapper.signed_in,
 699            vim_mode: event.vim_mode,
 700            copilot_enabled: event.copilot_enabled,
 701            copilot_enabled_for_language: event.copilot_enabled_for_language,
 702            country_code: country_code.unwrap_or("XX".to_string()),
 703            region_code: "".to_string(),
 704            city: "".to_string(),
 705            historical_event: false,
 706        }
 707    }
 708}
 709
 710#[derive(Serialize, Debug, clickhouse::Row)]
 711pub struct CopilotEventRow {
 712    pub installation_id: String,
 713    pub suggestion_id: String,
 714    pub suggestion_accepted: bool,
 715    pub app_version: String,
 716    pub file_extension: String,
 717    pub os_name: String,
 718    pub os_version: String,
 719    pub release_channel: String,
 720    pub signed_in: bool,
 721    #[serde(serialize_with = "serialize_country_code")]
 722    pub country_code: String,
 723    pub region_code: String,
 724    pub city: String,
 725    pub time: i64,
 726    pub is_staff: Option<bool>,
 727    pub session_id: Option<String>,
 728    pub major: Option<i32>,
 729    pub minor: Option<i32>,
 730    pub patch: Option<i32>,
 731}
 732
 733impl CopilotEventRow {
 734    fn from_event(
 735        event: CopilotEvent,
 736        wrapper: &EventWrapper,
 737        body: &EventRequestBody,
 738        first_event_at: chrono::DateTime<chrono::Utc>,
 739        country_code: Option<String>,
 740    ) -> Self {
 741        let semver = body.semver();
 742        let time =
 743            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 744
 745        Self {
 746            app_version: body.app_version.clone(),
 747            major: semver.map(|v| v.major() as i32),
 748            minor: semver.map(|v| v.minor() as i32),
 749            patch: semver.map(|v| v.patch() as i32),
 750            release_channel: body.release_channel.clone().unwrap_or_default(),
 751            os_name: body.os_name.clone(),
 752            os_version: body.os_version.clone().unwrap_or_default(),
 753            installation_id: body.installation_id.clone().unwrap_or_default(),
 754            session_id: body.session_id.clone(),
 755            is_staff: body.is_staff,
 756            time: time.timestamp_millis(),
 757            file_extension: event.file_extension.unwrap_or_default(),
 758            signed_in: wrapper.signed_in,
 759            country_code: country_code.unwrap_or("XX".to_string()),
 760            region_code: "".to_string(),
 761            city: "".to_string(),
 762            suggestion_id: event.suggestion_id.unwrap_or_default(),
 763            suggestion_accepted: event.suggestion_accepted,
 764        }
 765    }
 766}
 767
 768#[derive(Serialize, Debug, clickhouse::Row)]
 769pub struct CallEventRow {
 770    // AppInfoBase
 771    app_version: String,
 772    major: Option<i32>,
 773    minor: Option<i32>,
 774    patch: Option<i32>,
 775    release_channel: String,
 776
 777    // ClientEventBase
 778    installation_id: String,
 779    session_id: Option<String>,
 780    is_staff: Option<bool>,
 781    time: i64,
 782
 783    // CallEventRow
 784    operation: String,
 785    room_id: Option<u64>,
 786    channel_id: Option<u64>,
 787}
 788
 789impl CallEventRow {
 790    fn from_event(
 791        event: CallEvent,
 792        wrapper: &EventWrapper,
 793        body: &EventRequestBody,
 794        first_event_at: chrono::DateTime<chrono::Utc>,
 795    ) -> Self {
 796        let semver = body.semver();
 797        let time =
 798            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 799
 800        Self {
 801            app_version: body.app_version.clone(),
 802            major: semver.map(|v| v.major() as i32),
 803            minor: semver.map(|v| v.minor() as i32),
 804            patch: semver.map(|v| v.patch() as i32),
 805            release_channel: body.release_channel.clone().unwrap_or_default(),
 806            installation_id: body.installation_id.clone().unwrap_or_default(),
 807            session_id: body.session_id.clone(),
 808            is_staff: body.is_staff,
 809            time: time.timestamp_millis(),
 810            operation: event.operation,
 811            room_id: event.room_id,
 812            channel_id: event.channel_id,
 813        }
 814    }
 815}
 816
 817#[derive(Serialize, Debug, clickhouse::Row)]
 818pub struct AssistantEventRow {
 819    // AppInfoBase
 820    app_version: String,
 821    major: Option<i32>,
 822    minor: Option<i32>,
 823    patch: Option<i32>,
 824    release_channel: String,
 825
 826    // ClientEventBase
 827    installation_id: Option<String>,
 828    session_id: Option<String>,
 829    is_staff: Option<bool>,
 830    time: i64,
 831
 832    // AssistantEventRow
 833    conversation_id: String,
 834    kind: String,
 835    model: String,
 836    response_latency_in_ms: Option<i64>,
 837    error_message: Option<String>,
 838}
 839
 840impl AssistantEventRow {
 841    fn from_event(
 842        event: AssistantEvent,
 843        wrapper: &EventWrapper,
 844        body: &EventRequestBody,
 845        first_event_at: chrono::DateTime<chrono::Utc>,
 846    ) -> Self {
 847        let semver = body.semver();
 848        let time =
 849            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 850
 851        Self {
 852            app_version: body.app_version.clone(),
 853            major: semver.map(|v| v.major() as i32),
 854            minor: semver.map(|v| v.minor() as i32),
 855            patch: semver.map(|v| v.patch() as i32),
 856            release_channel: body.release_channel.clone().unwrap_or_default(),
 857            installation_id: body.installation_id.clone(),
 858            session_id: body.session_id.clone(),
 859            is_staff: body.is_staff,
 860            time: time.timestamp_millis(),
 861            conversation_id: event.conversation_id.unwrap_or_default(),
 862            kind: event.kind.to_string(),
 863            model: event.model,
 864            response_latency_in_ms: event
 865                .response_latency
 866                .map(|latency| latency.as_millis() as i64),
 867            error_message: event.error_message,
 868        }
 869    }
 870}
 871
 872#[derive(Debug, clickhouse::Row, Serialize)]
 873pub struct CpuEventRow {
 874    pub installation_id: Option<String>,
 875    pub is_staff: Option<bool>,
 876    pub usage_as_percentage: f32,
 877    pub core_count: u32,
 878    pub app_version: String,
 879    pub release_channel: String,
 880    pub time: i64,
 881    pub session_id: Option<String>,
 882    // pub normalized_cpu_usage: f64, MATERIALIZED
 883    pub major: Option<i32>,
 884    pub minor: Option<i32>,
 885    pub patch: Option<i32>,
 886}
 887
 888impl CpuEventRow {
 889    fn from_event(
 890        event: CpuEvent,
 891        wrapper: &EventWrapper,
 892        body: &EventRequestBody,
 893        first_event_at: chrono::DateTime<chrono::Utc>,
 894    ) -> Self {
 895        let semver = body.semver();
 896        let time =
 897            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 898
 899        Self {
 900            app_version: body.app_version.clone(),
 901            major: semver.map(|v| v.major() as i32),
 902            minor: semver.map(|v| v.minor() as i32),
 903            patch: semver.map(|v| v.patch() as i32),
 904            release_channel: body.release_channel.clone().unwrap_or_default(),
 905            installation_id: body.installation_id.clone(),
 906            session_id: body.session_id.clone(),
 907            is_staff: body.is_staff,
 908            time: time.timestamp_millis(),
 909            usage_as_percentage: event.usage_as_percentage,
 910            core_count: event.core_count,
 911        }
 912    }
 913}
 914
 915#[derive(Serialize, Debug, clickhouse::Row)]
 916pub struct MemoryEventRow {
 917    // AppInfoBase
 918    app_version: String,
 919    major: Option<i32>,
 920    minor: Option<i32>,
 921    patch: Option<i32>,
 922    release_channel: String,
 923
 924    // ClientEventBase
 925    installation_id: Option<String>,
 926    session_id: Option<String>,
 927    is_staff: Option<bool>,
 928    time: i64,
 929
 930    // MemoryEventRow
 931    memory_in_bytes: u64,
 932    virtual_memory_in_bytes: u64,
 933}
 934
 935impl MemoryEventRow {
 936    fn from_event(
 937        event: MemoryEvent,
 938        wrapper: &EventWrapper,
 939        body: &EventRequestBody,
 940        first_event_at: chrono::DateTime<chrono::Utc>,
 941    ) -> Self {
 942        let semver = body.semver();
 943        let time =
 944            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 945
 946        Self {
 947            app_version: body.app_version.clone(),
 948            major: semver.map(|v| v.major() as i32),
 949            minor: semver.map(|v| v.minor() as i32),
 950            patch: semver.map(|v| v.patch() as i32),
 951            release_channel: body.release_channel.clone().unwrap_or_default(),
 952            installation_id: body.installation_id.clone(),
 953            session_id: body.session_id.clone(),
 954            is_staff: body.is_staff,
 955            time: time.timestamp_millis(),
 956            memory_in_bytes: event.memory_in_bytes,
 957            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
 958        }
 959    }
 960}
 961
 962#[derive(Serialize, Debug, clickhouse::Row)]
 963pub struct AppEventRow {
 964    // AppInfoBase
 965    app_version: String,
 966    major: Option<i32>,
 967    minor: Option<i32>,
 968    patch: Option<i32>,
 969    release_channel: String,
 970
 971    // ClientEventBase
 972    installation_id: Option<String>,
 973    session_id: Option<String>,
 974    is_staff: Option<bool>,
 975    time: i64,
 976
 977    // AppEventRow
 978    operation: String,
 979}
 980
 981impl AppEventRow {
 982    fn from_event(
 983        event: AppEvent,
 984        wrapper: &EventWrapper,
 985        body: &EventRequestBody,
 986        first_event_at: chrono::DateTime<chrono::Utc>,
 987    ) -> Self {
 988        let semver = body.semver();
 989        let time =
 990            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 991
 992        Self {
 993            app_version: body.app_version.clone(),
 994            major: semver.map(|v| v.major() as i32),
 995            minor: semver.map(|v| v.minor() as i32),
 996            patch: semver.map(|v| v.patch() as i32),
 997            release_channel: body.release_channel.clone().unwrap_or_default(),
 998            installation_id: body.installation_id.clone(),
 999            session_id: body.session_id.clone(),
1000            is_staff: body.is_staff,
1001            time: time.timestamp_millis(),
1002            operation: event.operation,
1003        }
1004    }
1005}
1006
1007#[derive(Serialize, Debug, clickhouse::Row)]
1008pub struct SettingEventRow {
1009    // AppInfoBase
1010    app_version: String,
1011    major: Option<i32>,
1012    minor: Option<i32>,
1013    patch: Option<i32>,
1014    release_channel: String,
1015
1016    // ClientEventBase
1017    installation_id: Option<String>,
1018    session_id: Option<String>,
1019    is_staff: Option<bool>,
1020    time: i64,
1021    // SettingEventRow
1022    setting: String,
1023    value: String,
1024}
1025
1026impl SettingEventRow {
1027    fn from_event(
1028        event: SettingEvent,
1029        wrapper: &EventWrapper,
1030        body: &EventRequestBody,
1031        first_event_at: chrono::DateTime<chrono::Utc>,
1032    ) -> Self {
1033        let semver = body.semver();
1034        let time =
1035            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1036
1037        Self {
1038            app_version: body.app_version.clone(),
1039            major: semver.map(|v| v.major() as i32),
1040            minor: semver.map(|v| v.minor() as i32),
1041            patch: semver.map(|v| v.patch() as i32),
1042            release_channel: body.release_channel.clone().unwrap_or_default(),
1043            installation_id: body.installation_id.clone(),
1044            session_id: body.session_id.clone(),
1045            is_staff: body.is_staff,
1046            time: time.timestamp_millis(),
1047            setting: event.setting,
1048            value: event.value,
1049        }
1050    }
1051}
1052
1053#[derive(Serialize, Debug, clickhouse::Row)]
1054pub struct ExtensionEventRow {
1055    // AppInfoBase
1056    app_version: String,
1057    major: Option<i32>,
1058    minor: Option<i32>,
1059    patch: Option<i32>,
1060    release_channel: String,
1061
1062    // ClientEventBase
1063    installation_id: Option<String>,
1064    session_id: Option<String>,
1065    is_staff: Option<bool>,
1066    time: i64,
1067
1068    // ExtensionEventRow
1069    extension_id: Arc<str>,
1070    extension_version: Arc<str>,
1071    dev: bool,
1072    schema_version: Option<i32>,
1073    wasm_api_version: Option<String>,
1074}
1075
1076impl ExtensionEventRow {
1077    fn from_event(
1078        event: ExtensionEvent,
1079        wrapper: &EventWrapper,
1080        body: &EventRequestBody,
1081        extension_metadata: Option<ExtensionMetadata>,
1082        first_event_at: chrono::DateTime<chrono::Utc>,
1083    ) -> Self {
1084        let semver = body.semver();
1085        let time =
1086            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1087
1088        Self {
1089            app_version: body.app_version.clone(),
1090            major: semver.map(|v| v.major() as i32),
1091            minor: semver.map(|v| v.minor() as i32),
1092            patch: semver.map(|v| v.patch() as i32),
1093            release_channel: body.release_channel.clone().unwrap_or_default(),
1094            installation_id: body.installation_id.clone(),
1095            session_id: body.session_id.clone(),
1096            is_staff: body.is_staff,
1097            time: time.timestamp_millis(),
1098            extension_id: event.extension_id,
1099            extension_version: event.version,
1100            dev: extension_metadata.is_none(),
1101            schema_version: extension_metadata
1102                .as_ref()
1103                .and_then(|metadata| metadata.manifest.schema_version),
1104            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1105                metadata
1106                    .manifest
1107                    .wasm_api_version
1108                    .as_ref()
1109                    .map(|version| version.to_string())
1110            }),
1111        }
1112    }
1113}
1114
1115#[derive(Serialize, Debug, clickhouse::Row)]
1116pub struct EditEventRow {
1117    // AppInfoBase
1118    app_version: String,
1119    major: Option<i32>,
1120    minor: Option<i32>,
1121    patch: Option<i32>,
1122    release_channel: String,
1123
1124    // ClientEventBase
1125    installation_id: Option<String>,
1126    // Note: This column name has a typo in the ClickHouse table.
1127    #[serde(rename = "sesssion_id")]
1128    session_id: Option<String>,
1129    is_staff: Option<bool>,
1130    time: i64,
1131
1132    // EditEventRow
1133    period_start: i64,
1134    period_end: i64,
1135    environment: String,
1136}
1137
1138impl EditEventRow {
1139    fn from_event(
1140        event: EditEvent,
1141        wrapper: &EventWrapper,
1142        body: &EventRequestBody,
1143        first_event_at: chrono::DateTime<chrono::Utc>,
1144    ) -> Self {
1145        let semver = body.semver();
1146        let time =
1147            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1148
1149        let period_start = time - chrono::Duration::milliseconds(event.duration);
1150        let period_end = time;
1151
1152        Self {
1153            app_version: body.app_version.clone(),
1154            major: semver.map(|v| v.major() as i32),
1155            minor: semver.map(|v| v.minor() as i32),
1156            patch: semver.map(|v| v.patch() as i32),
1157            release_channel: body.release_channel.clone().unwrap_or_default(),
1158            installation_id: body.installation_id.clone(),
1159            session_id: body.session_id.clone(),
1160            is_staff: body.is_staff,
1161            time: time.timestamp_millis(),
1162            period_start: period_start.timestamp_millis(),
1163            period_end: period_end.timestamp_millis(),
1164            environment: event.environment,
1165        }
1166    }
1167}
1168
1169#[derive(Serialize, Debug, clickhouse::Row)]
1170pub struct ActionEventRow {
1171    // AppInfoBase
1172    app_version: String,
1173    major: Option<i32>,
1174    minor: Option<i32>,
1175    patch: Option<i32>,
1176    release_channel: String,
1177
1178    // ClientEventBase
1179    installation_id: Option<String>,
1180    // Note: This column name has a typo in the ClickHouse table.
1181    #[serde(rename = "sesssion_id")]
1182    session_id: Option<String>,
1183    is_staff: Option<bool>,
1184    time: i64,
1185    // ActionEventRow
1186    source: String,
1187    action: String,
1188}
1189
1190impl ActionEventRow {
1191    fn from_event(
1192        event: ActionEvent,
1193        wrapper: &EventWrapper,
1194        body: &EventRequestBody,
1195        first_event_at: chrono::DateTime<chrono::Utc>,
1196    ) -> Self {
1197        let semver = body.semver();
1198        let time =
1199            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1200
1201        Self {
1202            app_version: body.app_version.clone(),
1203            major: semver.map(|v| v.major() as i32),
1204            minor: semver.map(|v| v.minor() as i32),
1205            patch: semver.map(|v| v.patch() as i32),
1206            release_channel: body.release_channel.clone().unwrap_or_default(),
1207            installation_id: body.installation_id.clone(),
1208            session_id: body.session_id.clone(),
1209            is_staff: body.is_staff,
1210            time: time.timestamp_millis(),
1211            source: event.source,
1212            action: event.action,
1213        }
1214    }
1215}
1216
1217pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1218    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
1219        return None;
1220    };
1221
1222    let mut summer = Sha256::new();
1223    summer.update(checksum_seed);
1224    summer.update(&json);
1225    summer.update(checksum_seed);
1226    Some(summer.finalize().into_iter().collect())
1227}