events.rs

   1use super::ips_file::IpsFile;
   2use crate::{api::slack, AppState, Error, Result};
   3use anyhow::{anyhow, Context};
   4use aws_sdk_s3::primitives::ByteStream;
   5use axum::{
   6    body::Bytes,
   7    headers::Header,
   8    http::{HeaderMap, HeaderName, StatusCode},
   9    routing::post,
  10    Extension, Router, TypedHeader,
  11};
  12use rpc::ExtensionMetadata;
  13use semantic_version::SemanticVersion;
  14use serde::{Serialize, Serializer};
  15use sha2::{Digest, Sha256};
  16use std::sync::{Arc, OnceLock};
  17use telemetry_events::{
  18    ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
  19    EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent,
  20    SettingEvent,
  21};
  22use uuid::Uuid;
  23
  24static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  25
  26pub fn router() -> Router {
  27    Router::new()
  28        .route("/telemetry/events", post(post_events))
  29        .route("/telemetry/crashes", post(post_crash))
  30        .route("/telemetry/panics", post(post_panic))
  31        .route("/telemetry/hangs", post(post_hang))
  32}
  33
  34pub struct ZedChecksumHeader(Vec<u8>);
  35
  36impl Header for ZedChecksumHeader {
  37    fn name() -> &'static HeaderName {
  38        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  39        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  40    }
  41
  42    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  43    where
  44        Self: Sized,
  45        I: Iterator<Item = &'i axum::http::HeaderValue>,
  46    {
  47        let checksum = values
  48            .next()
  49            .ok_or_else(axum::headers::Error::invalid)?
  50            .to_str()
  51            .map_err(|_| axum::headers::Error::invalid())?;
  52
  53        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  54        Ok(Self(bytes))
  55    }
  56
  57    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  58        unimplemented!()
  59    }
  60}
  61
  62pub struct CloudflareIpCountryHeader(String);
  63
  64impl Header for CloudflareIpCountryHeader {
  65    fn name() -> &'static HeaderName {
  66        static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
  67        CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
  68    }
  69
  70    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  71    where
  72        Self: Sized,
  73        I: Iterator<Item = &'i axum::http::HeaderValue>,
  74    {
  75        let country_code = values
  76            .next()
  77            .ok_or_else(axum::headers::Error::invalid)?
  78            .to_str()
  79            .map_err(|_| axum::headers::Error::invalid())?;
  80
  81        Ok(Self(country_code.to_string()))
  82    }
  83
  84    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  85        unimplemented!()
  86    }
  87}
  88
  89pub async fn post_crash(
  90    Extension(app): Extension<Arc<AppState>>,
  91    headers: HeaderMap,
  92    body: Bytes,
  93) -> Result<()> {
  94    let report = IpsFile::parse(&body)?;
  95    let version_threshold = SemanticVersion::new(0, 123, 0);
  96
  97    let bundle_id = &report.header.bundle_id;
  98    let app_version = &report.app_version();
  99
 100    if bundle_id == "dev.zed.Zed-Dev" {
 101        log::error!("Crash uploads from {} are ignored.", bundle_id);
 102        return Ok(());
 103    }
 104
 105    if app_version.is_none() || app_version.unwrap() < version_threshold {
 106        log::error!(
 107            "Crash uploads from {} are ignored.",
 108            report.header.app_version
 109        );
 110        return Ok(());
 111    }
 112    let app_version = app_version.unwrap();
 113
 114    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 115        let response = blob_store_client
 116            .head_object()
 117            .bucket(CRASH_REPORTS_BUCKET)
 118            .key(report.header.incident_id.clone() + ".ips")
 119            .send()
 120            .await;
 121
 122        if response.is_ok() {
 123            log::info!("We've already uploaded this crash");
 124            return Ok(());
 125        }
 126
 127        blob_store_client
 128            .put_object()
 129            .bucket(CRASH_REPORTS_BUCKET)
 130            .key(report.header.incident_id.clone() + ".ips")
 131            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 132            .body(ByteStream::from(body.to_vec()))
 133            .send()
 134            .await
 135            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 136            .ok();
 137    }
 138
 139    let recent_panic_on: Option<i64> = headers
 140        .get("x-zed-panicked-on")
 141        .and_then(|h| h.to_str().ok())
 142        .and_then(|s| s.parse().ok());
 143
 144    let installation_id = headers
 145        .get("x-zed-installation-id")
 146        .and_then(|h| h.to_str().ok())
 147        .map(|s| s.to_string())
 148        .unwrap_or_default();
 149
 150    let mut recent_panic = None;
 151
 152    if let Some(recent_panic_on) = recent_panic_on {
 153        let crashed_at = match report.timestamp() {
 154            Ok(t) => Some(t),
 155            Err(e) => {
 156                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 157                None
 158            }
 159        };
 160        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 161            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 162        }
 163    }
 164
 165    let description = report.description(recent_panic);
 166    let summary = report.backtrace_summary();
 167
 168    tracing::error!(
 169        service = "client",
 170        version = %report.header.app_version,
 171        os_version = %report.header.os_version,
 172        bundle_id = %report.header.bundle_id,
 173        incident_id = %report.header.incident_id,
 174        installation_id = %installation_id,
 175        description = %description,
 176        backtrace = %summary,
 177        "crash report");
 178
 179    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 180        let payload = slack::WebhookBody::new(|w| {
 181            w.add_section(|s| s.text(slack::Text::markdown(description)))
 182                .add_section(|s| {
 183                    s.add_field(slack::Text::markdown(format!(
 184                        "*Version:*\n{} ({})",
 185                        bundle_id, app_version
 186                    )))
 187                    .add_field({
 188                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 189                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 190                            hostname.strip_prefix("http://").unwrap_or_default()
 191                        });
 192
 193                        slack::Text::markdown(format!(
 194                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 195                            CRASH_REPORTS_BUCKET,
 196                            hostname,
 197                            report.header.incident_id,
 198                            report
 199                                .header
 200                                .incident_id
 201                                .chars()
 202                                .take(8)
 203                                .collect::<String>(),
 204                        ))
 205                    })
 206                })
 207                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 208        });
 209        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 210            log::error!("Failed to serialize payload to JSON: {err}");
 211            Error::Internal(anyhow!(err))
 212        })?;
 213
 214        reqwest::Client::new()
 215            .post(slack_panics_webhook)
 216            .header("Content-Type", "application/json")
 217            .body(payload_json)
 218            .send()
 219            .await
 220            .map_err(|err| {
 221                log::error!("Failed to send payload to Slack: {err}");
 222                Error::Internal(anyhow!(err))
 223            })?;
 224    }
 225
 226    Ok(())
 227}
 228
 229pub async fn post_hang(
 230    Extension(app): Extension<Arc<AppState>>,
 231    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 232    body: Bytes,
 233) -> Result<()> {
 234    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 235        return Err(Error::Http(
 236            StatusCode::INTERNAL_SERVER_ERROR,
 237            "events not enabled".into(),
 238        ))?;
 239    };
 240
 241    if checksum != expected {
 242        return Err(Error::Http(
 243            StatusCode::BAD_REQUEST,
 244            "invalid checksum".into(),
 245        ))?;
 246    }
 247
 248    let incident_id = Uuid::new_v4().to_string();
 249
 250    // dump JSON into S3 so we can get frame offsets if we need to.
 251    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 252        blob_store_client
 253            .put_object()
 254            .bucket(CRASH_REPORTS_BUCKET)
 255            .key(incident_id.clone() + ".hang.json")
 256            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 257            .body(ByteStream::from(body.to_vec()))
 258            .send()
 259            .await
 260            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 261            .ok();
 262    }
 263
 264    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
 265        log::error!("can't parse report json: {err}");
 266        Error::Internal(anyhow!(err))
 267    })?;
 268
 269    let mut backtrace = "Possible hang detected on main thread:".to_string();
 270    let unknown = "<unknown>".to_string();
 271    for frame in report.backtrace.iter() {
 272        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
 273    }
 274
 275    tracing::error!(
 276        service = "client",
 277        version = %report.app_version.unwrap_or_default().to_string(),
 278        os_name = %report.os_name,
 279        os_version = report.os_version.unwrap_or_default().to_string(),
 280        incident_id = %incident_id,
 281        installation_id = %report.installation_id.unwrap_or_default(),
 282        backtrace = %backtrace,
 283        "hang report");
 284
 285    Ok(())
 286}
 287
 288pub async fn post_panic(
 289    Extension(app): Extension<Arc<AppState>>,
 290    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 291    body: Bytes,
 292) -> Result<()> {
 293    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 294        return Err(Error::Http(
 295            StatusCode::INTERNAL_SERVER_ERROR,
 296            "events not enabled".into(),
 297        ))?;
 298    };
 299
 300    if checksum != expected {
 301        return Err(Error::Http(
 302            StatusCode::BAD_REQUEST,
 303            "invalid checksum".into(),
 304        ))?;
 305    }
 306
 307    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
 308        .map_err(|_| Error::Http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
 309    let panic = report.panic;
 310
 311    // better OS reporting for linux (because linux is hard):
 312    // - Remove os_version/app_version/os_name from the gpui platform trait
 313    // - Move platform processing data into client/telemetry
 314    // - Duplicate some small code in macOS platform for a version check
 315    // - Add GPUI API for reporting the selected platform integration
 316    //  - macos-blade, macos-metal, linux-X11, linux-headless
 317    // if cfg(macos( { "Macos" } else { "Linux-{cx.compositor_name()"} ))
 318
 319    tracing::error!(
 320        service = "client",
 321        version = %panic.app_version,
 322        os_name = %panic.os_name,
 323        os_version = %panic.os_version.clone().unwrap_or_default(),
 324        installation_id = %panic.installation_id.unwrap_or_default(),
 325        description = %panic.payload,
 326        backtrace = %panic.backtrace.join("\n"),
 327        "panic report");
 328
 329    let backtrace = if panic.backtrace.len() > 25 {
 330        let total = panic.backtrace.len();
 331        format!(
 332            "{}\n   and {} more",
 333            panic
 334                .backtrace
 335                .iter()
 336                .take(20)
 337                .cloned()
 338                .collect::<Vec<_>>()
 339                .join("\n"),
 340            total - 20
 341        )
 342    } else {
 343        panic.backtrace.join("\n")
 344    };
 345    let backtrace_with_summary = panic.payload + "\n" + &backtrace;
 346
 347    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 348        let payload = slack::WebhookBody::new(|w| {
 349            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
 350                .add_section(|s| {
 351                    s.add_field(slack::Text::markdown(format!(
 352                        "*Version:*\n {} ",
 353                        panic.app_version
 354                    )))
 355                    .add_field({
 356                        slack::Text::markdown(format!(
 357                            "*OS:*\n{} {}",
 358                            panic.os_name,
 359                            panic.os_version.unwrap_or_default()
 360                        ))
 361                    })
 362                })
 363                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
 364        });
 365        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 366            log::error!("Failed to serialize payload to JSON: {err}");
 367            Error::Internal(anyhow!(err))
 368        })?;
 369
 370        reqwest::Client::new()
 371            .post(slack_panics_webhook)
 372            .header("Content-Type", "application/json")
 373            .body(payload_json)
 374            .send()
 375            .await
 376            .map_err(|err| {
 377                log::error!("Failed to send payload to Slack: {err}");
 378                Error::Internal(anyhow!(err))
 379            })?;
 380    }
 381
 382    Ok(())
 383}
 384
 385pub async fn post_events(
 386    Extension(app): Extension<Arc<AppState>>,
 387    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 388    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 389    body: Bytes,
 390) -> Result<()> {
 391    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 392        Err(Error::Http(
 393            StatusCode::NOT_IMPLEMENTED,
 394            "not supported".into(),
 395        ))?
 396    };
 397
 398    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 399        return Err(Error::Http(
 400            StatusCode::INTERNAL_SERVER_ERROR,
 401            "events not enabled".into(),
 402        ))?;
 403    };
 404
 405    if checksum != expected {
 406        return Err(Error::Http(
 407            StatusCode::BAD_REQUEST,
 408            "invalid checksum".into(),
 409        ))?;
 410    }
 411
 412    let request_body: telemetry_events::EventRequestBody =
 413        serde_json::from_slice(&body).map_err(|err| {
 414            log::error!("can't parse event json: {err}");
 415            Error::Internal(anyhow!(err))
 416        })?;
 417
 418    let mut to_upload = ToUpload::default();
 419    let Some(last_event) = request_body.events.last() else {
 420        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
 421    };
 422    let country_code = country_code_header.map(|h| h.0 .0);
 423
 424    let first_event_at = chrono::Utc::now()
 425        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 426
 427    for wrapper in &request_body.events {
 428        match &wrapper.event {
 429            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 430                event.clone(),
 431                &wrapper,
 432                &request_body,
 433                first_event_at,
 434                country_code.clone(),
 435            )),
 436            // Needed for clients sending old copilot_event types
 437            Event::Copilot(_) => {}
 438            Event::InlineCompletion(event) => {
 439                to_upload
 440                    .inline_completion_events
 441                    .push(InlineCompletionEventRow::from_event(
 442                        event.clone(),
 443                        &wrapper,
 444                        &request_body,
 445                        first_event_at,
 446                        country_code.clone(),
 447                    ))
 448            }
 449            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 450                event.clone(),
 451                &wrapper,
 452                &request_body,
 453                first_event_at,
 454            )),
 455            Event::Assistant(event) => {
 456                to_upload
 457                    .assistant_events
 458                    .push(AssistantEventRow::from_event(
 459                        event.clone(),
 460                        &wrapper,
 461                        &request_body,
 462                        first_event_at,
 463                    ))
 464            }
 465            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 466                event.clone(),
 467                &wrapper,
 468                &request_body,
 469                first_event_at,
 470            )),
 471            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 472                event.clone(),
 473                &wrapper,
 474                &request_body,
 475                first_event_at,
 476            )),
 477            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 478                event.clone(),
 479                &wrapper,
 480                &request_body,
 481                first_event_at,
 482            )),
 483            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 484                event.clone(),
 485                &wrapper,
 486                &request_body,
 487                first_event_at,
 488            )),
 489            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 490                event.clone(),
 491                &wrapper,
 492                &request_body,
 493                first_event_at,
 494            )),
 495            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 496                event.clone(),
 497                &wrapper,
 498                &request_body,
 499                first_event_at,
 500            )),
 501            Event::Extension(event) => {
 502                let metadata = app
 503                    .db
 504                    .get_extension_version(&event.extension_id, &event.version)
 505                    .await?;
 506                to_upload
 507                    .extension_events
 508                    .push(ExtensionEventRow::from_event(
 509                        event.clone(),
 510                        &wrapper,
 511                        &request_body,
 512                        metadata,
 513                        first_event_at,
 514                    ))
 515            }
 516        }
 517    }
 518
 519    to_upload
 520        .upload(&clickhouse_client)
 521        .await
 522        .map_err(|err| Error::Internal(anyhow!(err)))?;
 523
 524    Ok(())
 525}
 526
 527#[derive(Default)]
 528struct ToUpload {
 529    editor_events: Vec<EditorEventRow>,
 530    inline_completion_events: Vec<InlineCompletionEventRow>,
 531    assistant_events: Vec<AssistantEventRow>,
 532    call_events: Vec<CallEventRow>,
 533    cpu_events: Vec<CpuEventRow>,
 534    memory_events: Vec<MemoryEventRow>,
 535    app_events: Vec<AppEventRow>,
 536    setting_events: Vec<SettingEventRow>,
 537    extension_events: Vec<ExtensionEventRow>,
 538    edit_events: Vec<EditEventRow>,
 539    action_events: Vec<ActionEventRow>,
 540}
 541
 542impl ToUpload {
 543    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 544        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 545        Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 546            .await
 547            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 548
 549        const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
 550        Self::upload_to_table(
 551            INLINE_COMPLETION_EVENTS_TABLE,
 552            &self.inline_completion_events,
 553            clickhouse_client,
 554        )
 555        .await
 556        .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
 557
 558        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 559        Self::upload_to_table(
 560            ASSISTANT_EVENTS_TABLE,
 561            &self.assistant_events,
 562            clickhouse_client,
 563        )
 564        .await
 565        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 566
 567        const CALL_EVENTS_TABLE: &str = "call_events";
 568        Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 569            .await
 570            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 571
 572        const CPU_EVENTS_TABLE: &str = "cpu_events";
 573        Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 574            .await
 575            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 576
 577        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 578        Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 579            .await
 580            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 581
 582        const APP_EVENTS_TABLE: &str = "app_events";
 583        Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 584            .await
 585            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 586
 587        const SETTING_EVENTS_TABLE: &str = "setting_events";
 588        Self::upload_to_table(
 589            SETTING_EVENTS_TABLE,
 590            &self.setting_events,
 591            clickhouse_client,
 592        )
 593        .await
 594        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 595
 596        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 597        Self::upload_to_table(
 598            EXTENSION_EVENTS_TABLE,
 599            &self.extension_events,
 600            clickhouse_client,
 601        )
 602        .await
 603        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 604
 605        const EDIT_EVENTS_TABLE: &str = "edit_events";
 606        Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 607            .await
 608            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 609
 610        const ACTION_EVENTS_TABLE: &str = "action_events";
 611        Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 612            .await
 613            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 614
 615        Ok(())
 616    }
 617
 618    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
 619        table: &str,
 620        rows: &[T],
 621        clickhouse_client: &clickhouse::Client,
 622    ) -> anyhow::Result<()> {
 623        if !rows.is_empty() {
 624            let mut insert = clickhouse_client.insert(table)?;
 625
 626            for event in rows {
 627                insert.write(event).await?;
 628            }
 629
 630            insert.end().await?;
 631
 632            let event_count = rows.len();
 633            log::info!(
 634                "wrote {event_count} {event_specifier} to '{table}'",
 635                event_specifier = if event_count == 1 { "event" } else { "events" }
 636            );
 637        }
 638
 639        Ok(())
 640    }
 641}
 642
 643pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 644where
 645    S: Serializer,
 646{
 647    if country_code.len() != 2 {
 648        use serde::ser::Error;
 649        return Err(S::Error::custom(
 650            "country_code must be exactly 2 characters",
 651        ));
 652    }
 653
 654    let country_code = country_code.as_bytes();
 655
 656    serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
 657}
 658
 659#[derive(Serialize, Debug, clickhouse::Row)]
 660pub struct EditorEventRow {
 661    pub installation_id: String,
 662    pub operation: String,
 663    pub app_version: String,
 664    pub file_extension: String,
 665    pub os_name: String,
 666    pub os_version: String,
 667    pub release_channel: String,
 668    pub signed_in: bool,
 669    pub vim_mode: bool,
 670    #[serde(serialize_with = "serialize_country_code")]
 671    pub country_code: String,
 672    pub region_code: String,
 673    pub city: String,
 674    pub time: i64,
 675    pub copilot_enabled: bool,
 676    pub copilot_enabled_for_language: bool,
 677    pub historical_event: bool,
 678    pub architecture: String,
 679    pub is_staff: Option<bool>,
 680    pub session_id: Option<String>,
 681    pub major: Option<i32>,
 682    pub minor: Option<i32>,
 683    pub patch: Option<i32>,
 684}
 685
 686impl EditorEventRow {
 687    fn from_event(
 688        event: EditorEvent,
 689        wrapper: &EventWrapper,
 690        body: &EventRequestBody,
 691        first_event_at: chrono::DateTime<chrono::Utc>,
 692        country_code: Option<String>,
 693    ) -> Self {
 694        let semver = body.semver();
 695        let time =
 696            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 697
 698        Self {
 699            app_version: body.app_version.clone(),
 700            major: semver.map(|v| v.major() as i32),
 701            minor: semver.map(|v| v.minor() as i32),
 702            patch: semver.map(|v| v.patch() as i32),
 703            release_channel: body.release_channel.clone().unwrap_or_default(),
 704            os_name: body.os_name.clone(),
 705            os_version: body.os_version.clone().unwrap_or_default(),
 706            architecture: body.architecture.clone(),
 707            installation_id: body.installation_id.clone().unwrap_or_default(),
 708            session_id: body.session_id.clone(),
 709            is_staff: body.is_staff,
 710            time: time.timestamp_millis(),
 711            operation: event.operation,
 712            file_extension: event.file_extension.unwrap_or_default(),
 713            signed_in: wrapper.signed_in,
 714            vim_mode: event.vim_mode,
 715            copilot_enabled: event.copilot_enabled,
 716            copilot_enabled_for_language: event.copilot_enabled_for_language,
 717            country_code: country_code.unwrap_or("XX".to_string()),
 718            region_code: "".to_string(),
 719            city: "".to_string(),
 720            historical_event: false,
 721        }
 722    }
 723}
 724
 725#[derive(Serialize, Debug, clickhouse::Row)]
 726pub struct InlineCompletionEventRow {
 727    pub installation_id: String,
 728    pub provider: String,
 729    pub suggestion_accepted: bool,
 730    pub app_version: String,
 731    pub file_extension: String,
 732    pub os_name: String,
 733    pub os_version: String,
 734    pub release_channel: String,
 735    pub signed_in: bool,
 736    #[serde(serialize_with = "serialize_country_code")]
 737    pub country_code: String,
 738    pub region_code: String,
 739    pub city: String,
 740    pub time: i64,
 741    pub is_staff: Option<bool>,
 742    pub session_id: Option<String>,
 743    pub major: Option<i32>,
 744    pub minor: Option<i32>,
 745    pub patch: Option<i32>,
 746}
 747
 748impl InlineCompletionEventRow {
 749    fn from_event(
 750        event: InlineCompletionEvent,
 751        wrapper: &EventWrapper,
 752        body: &EventRequestBody,
 753        first_event_at: chrono::DateTime<chrono::Utc>,
 754        country_code: Option<String>,
 755    ) -> Self {
 756        let semver = body.semver();
 757        let time =
 758            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 759
 760        Self {
 761            app_version: body.app_version.clone(),
 762            major: semver.map(|v| v.major() as i32),
 763            minor: semver.map(|v| v.minor() as i32),
 764            patch: semver.map(|v| v.patch() as i32),
 765            release_channel: body.release_channel.clone().unwrap_or_default(),
 766            os_name: body.os_name.clone(),
 767            os_version: body.os_version.clone().unwrap_or_default(),
 768            installation_id: body.installation_id.clone().unwrap_or_default(),
 769            session_id: body.session_id.clone(),
 770            is_staff: body.is_staff,
 771            time: time.timestamp_millis(),
 772            file_extension: event.file_extension.unwrap_or_default(),
 773            signed_in: wrapper.signed_in,
 774            country_code: country_code.unwrap_or("XX".to_string()),
 775            region_code: "".to_string(),
 776            city: "".to_string(),
 777            provider: event.provider,
 778            suggestion_accepted: event.suggestion_accepted,
 779        }
 780    }
 781}
 782
 783#[derive(Serialize, Debug, clickhouse::Row)]
 784pub struct CallEventRow {
 785    // AppInfoBase
 786    app_version: String,
 787    major: Option<i32>,
 788    minor: Option<i32>,
 789    patch: Option<i32>,
 790    release_channel: String,
 791
 792    // ClientEventBase
 793    installation_id: String,
 794    session_id: Option<String>,
 795    is_staff: Option<bool>,
 796    time: i64,
 797
 798    // CallEventRow
 799    operation: String,
 800    room_id: Option<u64>,
 801    channel_id: Option<u64>,
 802}
 803
 804impl CallEventRow {
 805    fn from_event(
 806        event: CallEvent,
 807        wrapper: &EventWrapper,
 808        body: &EventRequestBody,
 809        first_event_at: chrono::DateTime<chrono::Utc>,
 810    ) -> Self {
 811        let semver = body.semver();
 812        let time =
 813            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 814
 815        Self {
 816            app_version: body.app_version.clone(),
 817            major: semver.map(|v| v.major() as i32),
 818            minor: semver.map(|v| v.minor() as i32),
 819            patch: semver.map(|v| v.patch() as i32),
 820            release_channel: body.release_channel.clone().unwrap_or_default(),
 821            installation_id: body.installation_id.clone().unwrap_or_default(),
 822            session_id: body.session_id.clone(),
 823            is_staff: body.is_staff,
 824            time: time.timestamp_millis(),
 825            operation: event.operation,
 826            room_id: event.room_id,
 827            channel_id: event.channel_id,
 828        }
 829    }
 830}
 831
 832#[derive(Serialize, Debug, clickhouse::Row)]
 833pub struct AssistantEventRow {
 834    // AppInfoBase
 835    app_version: String,
 836    major: Option<i32>,
 837    minor: Option<i32>,
 838    patch: Option<i32>,
 839    release_channel: String,
 840
 841    // ClientEventBase
 842    installation_id: Option<String>,
 843    session_id: Option<String>,
 844    is_staff: Option<bool>,
 845    time: i64,
 846
 847    // AssistantEventRow
 848    conversation_id: String,
 849    kind: String,
 850    model: String,
 851    response_latency_in_ms: Option<i64>,
 852    error_message: Option<String>,
 853}
 854
 855impl AssistantEventRow {
 856    fn from_event(
 857        event: AssistantEvent,
 858        wrapper: &EventWrapper,
 859        body: &EventRequestBody,
 860        first_event_at: chrono::DateTime<chrono::Utc>,
 861    ) -> Self {
 862        let semver = body.semver();
 863        let time =
 864            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 865
 866        Self {
 867            app_version: body.app_version.clone(),
 868            major: semver.map(|v| v.major() as i32),
 869            minor: semver.map(|v| v.minor() as i32),
 870            patch: semver.map(|v| v.patch() as i32),
 871            release_channel: body.release_channel.clone().unwrap_or_default(),
 872            installation_id: body.installation_id.clone(),
 873            session_id: body.session_id.clone(),
 874            is_staff: body.is_staff,
 875            time: time.timestamp_millis(),
 876            conversation_id: event.conversation_id.unwrap_or_default(),
 877            kind: event.kind.to_string(),
 878            model: event.model,
 879            response_latency_in_ms: event
 880                .response_latency
 881                .map(|latency| latency.as_millis() as i64),
 882            error_message: event.error_message,
 883        }
 884    }
 885}
 886
 887#[derive(Debug, clickhouse::Row, Serialize)]
 888pub struct CpuEventRow {
 889    pub installation_id: Option<String>,
 890    pub is_staff: Option<bool>,
 891    pub usage_as_percentage: f32,
 892    pub core_count: u32,
 893    pub app_version: String,
 894    pub release_channel: String,
 895    pub time: i64,
 896    pub session_id: Option<String>,
 897    // pub normalized_cpu_usage: f64, MATERIALIZED
 898    pub major: Option<i32>,
 899    pub minor: Option<i32>,
 900    pub patch: Option<i32>,
 901}
 902
 903impl CpuEventRow {
 904    fn from_event(
 905        event: CpuEvent,
 906        wrapper: &EventWrapper,
 907        body: &EventRequestBody,
 908        first_event_at: chrono::DateTime<chrono::Utc>,
 909    ) -> Self {
 910        let semver = body.semver();
 911        let time =
 912            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 913
 914        Self {
 915            app_version: body.app_version.clone(),
 916            major: semver.map(|v| v.major() as i32),
 917            minor: semver.map(|v| v.minor() as i32),
 918            patch: semver.map(|v| v.patch() as i32),
 919            release_channel: body.release_channel.clone().unwrap_or_default(),
 920            installation_id: body.installation_id.clone(),
 921            session_id: body.session_id.clone(),
 922            is_staff: body.is_staff,
 923            time: time.timestamp_millis(),
 924            usage_as_percentage: event.usage_as_percentage,
 925            core_count: event.core_count,
 926        }
 927    }
 928}
 929
 930#[derive(Serialize, Debug, clickhouse::Row)]
 931pub struct MemoryEventRow {
 932    // AppInfoBase
 933    app_version: String,
 934    major: Option<i32>,
 935    minor: Option<i32>,
 936    patch: Option<i32>,
 937    release_channel: String,
 938
 939    // ClientEventBase
 940    installation_id: Option<String>,
 941    session_id: Option<String>,
 942    is_staff: Option<bool>,
 943    time: i64,
 944
 945    // MemoryEventRow
 946    memory_in_bytes: u64,
 947    virtual_memory_in_bytes: u64,
 948}
 949
 950impl MemoryEventRow {
 951    fn from_event(
 952        event: MemoryEvent,
 953        wrapper: &EventWrapper,
 954        body: &EventRequestBody,
 955        first_event_at: chrono::DateTime<chrono::Utc>,
 956    ) -> Self {
 957        let semver = body.semver();
 958        let time =
 959            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 960
 961        Self {
 962            app_version: body.app_version.clone(),
 963            major: semver.map(|v| v.major() as i32),
 964            minor: semver.map(|v| v.minor() as i32),
 965            patch: semver.map(|v| v.patch() as i32),
 966            release_channel: body.release_channel.clone().unwrap_or_default(),
 967            installation_id: body.installation_id.clone(),
 968            session_id: body.session_id.clone(),
 969            is_staff: body.is_staff,
 970            time: time.timestamp_millis(),
 971            memory_in_bytes: event.memory_in_bytes,
 972            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
 973        }
 974    }
 975}
 976
 977#[derive(Serialize, Debug, clickhouse::Row)]
 978pub struct AppEventRow {
 979    // AppInfoBase
 980    app_version: String,
 981    major: Option<i32>,
 982    minor: Option<i32>,
 983    patch: Option<i32>,
 984    release_channel: String,
 985
 986    // ClientEventBase
 987    installation_id: Option<String>,
 988    session_id: Option<String>,
 989    is_staff: Option<bool>,
 990    time: i64,
 991
 992    // AppEventRow
 993    operation: String,
 994}
 995
 996impl AppEventRow {
 997    fn from_event(
 998        event: AppEvent,
 999        wrapper: &EventWrapper,
1000        body: &EventRequestBody,
1001        first_event_at: chrono::DateTime<chrono::Utc>,
1002    ) -> Self {
1003        let semver = body.semver();
1004        let time =
1005            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1006
1007        Self {
1008            app_version: body.app_version.clone(),
1009            major: semver.map(|v| v.major() as i32),
1010            minor: semver.map(|v| v.minor() as i32),
1011            patch: semver.map(|v| v.patch() as i32),
1012            release_channel: body.release_channel.clone().unwrap_or_default(),
1013            installation_id: body.installation_id.clone(),
1014            session_id: body.session_id.clone(),
1015            is_staff: body.is_staff,
1016            time: time.timestamp_millis(),
1017            operation: event.operation,
1018        }
1019    }
1020}
1021
1022#[derive(Serialize, Debug, clickhouse::Row)]
1023pub struct SettingEventRow {
1024    // AppInfoBase
1025    app_version: String,
1026    major: Option<i32>,
1027    minor: Option<i32>,
1028    patch: Option<i32>,
1029    release_channel: String,
1030
1031    // ClientEventBase
1032    installation_id: Option<String>,
1033    session_id: Option<String>,
1034    is_staff: Option<bool>,
1035    time: i64,
1036    // SettingEventRow
1037    setting: String,
1038    value: String,
1039}
1040
1041impl SettingEventRow {
1042    fn from_event(
1043        event: SettingEvent,
1044        wrapper: &EventWrapper,
1045        body: &EventRequestBody,
1046        first_event_at: chrono::DateTime<chrono::Utc>,
1047    ) -> Self {
1048        let semver = body.semver();
1049        let time =
1050            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1051
1052        Self {
1053            app_version: body.app_version.clone(),
1054            major: semver.map(|v| v.major() as i32),
1055            minor: semver.map(|v| v.minor() as i32),
1056            patch: semver.map(|v| v.patch() as i32),
1057            release_channel: body.release_channel.clone().unwrap_or_default(),
1058            installation_id: body.installation_id.clone(),
1059            session_id: body.session_id.clone(),
1060            is_staff: body.is_staff,
1061            time: time.timestamp_millis(),
1062            setting: event.setting,
1063            value: event.value,
1064        }
1065    }
1066}
1067
1068#[derive(Serialize, Debug, clickhouse::Row)]
1069pub struct ExtensionEventRow {
1070    // AppInfoBase
1071    app_version: String,
1072    major: Option<i32>,
1073    minor: Option<i32>,
1074    patch: Option<i32>,
1075    release_channel: String,
1076
1077    // ClientEventBase
1078    installation_id: Option<String>,
1079    session_id: Option<String>,
1080    is_staff: Option<bool>,
1081    time: i64,
1082
1083    // ExtensionEventRow
1084    extension_id: Arc<str>,
1085    extension_version: Arc<str>,
1086    dev: bool,
1087    schema_version: Option<i32>,
1088    wasm_api_version: Option<String>,
1089}
1090
1091impl ExtensionEventRow {
1092    fn from_event(
1093        event: ExtensionEvent,
1094        wrapper: &EventWrapper,
1095        body: &EventRequestBody,
1096        extension_metadata: Option<ExtensionMetadata>,
1097        first_event_at: chrono::DateTime<chrono::Utc>,
1098    ) -> Self {
1099        let semver = body.semver();
1100        let time =
1101            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1102
1103        Self {
1104            app_version: body.app_version.clone(),
1105            major: semver.map(|v| v.major() as i32),
1106            minor: semver.map(|v| v.minor() as i32),
1107            patch: semver.map(|v| v.patch() as i32),
1108            release_channel: body.release_channel.clone().unwrap_or_default(),
1109            installation_id: body.installation_id.clone(),
1110            session_id: body.session_id.clone(),
1111            is_staff: body.is_staff,
1112            time: time.timestamp_millis(),
1113            extension_id: event.extension_id,
1114            extension_version: event.version,
1115            dev: extension_metadata.is_none(),
1116            schema_version: extension_metadata
1117                .as_ref()
1118                .and_then(|metadata| metadata.manifest.schema_version),
1119            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1120                metadata
1121                    .manifest
1122                    .wasm_api_version
1123                    .as_ref()
1124                    .map(|version| version.to_string())
1125            }),
1126        }
1127    }
1128}
1129
1130#[derive(Serialize, Debug, clickhouse::Row)]
1131pub struct EditEventRow {
1132    // AppInfoBase
1133    app_version: String,
1134    major: Option<i32>,
1135    minor: Option<i32>,
1136    patch: Option<i32>,
1137    release_channel: String,
1138
1139    // ClientEventBase
1140    installation_id: Option<String>,
1141    // Note: This column name has a typo in the ClickHouse table.
1142    #[serde(rename = "sesssion_id")]
1143    session_id: Option<String>,
1144    is_staff: Option<bool>,
1145    time: i64,
1146
1147    // EditEventRow
1148    period_start: i64,
1149    period_end: i64,
1150    environment: String,
1151}
1152
1153impl EditEventRow {
1154    fn from_event(
1155        event: EditEvent,
1156        wrapper: &EventWrapper,
1157        body: &EventRequestBody,
1158        first_event_at: chrono::DateTime<chrono::Utc>,
1159    ) -> Self {
1160        let semver = body.semver();
1161        let time =
1162            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1163
1164        let period_start = time - chrono::Duration::milliseconds(event.duration);
1165        let period_end = time;
1166
1167        Self {
1168            app_version: body.app_version.clone(),
1169            major: semver.map(|v| v.major() as i32),
1170            minor: semver.map(|v| v.minor() as i32),
1171            patch: semver.map(|v| v.patch() as i32),
1172            release_channel: body.release_channel.clone().unwrap_or_default(),
1173            installation_id: body.installation_id.clone(),
1174            session_id: body.session_id.clone(),
1175            is_staff: body.is_staff,
1176            time: time.timestamp_millis(),
1177            period_start: period_start.timestamp_millis(),
1178            period_end: period_end.timestamp_millis(),
1179            environment: event.environment,
1180        }
1181    }
1182}
1183
1184#[derive(Serialize, Debug, clickhouse::Row)]
1185pub struct ActionEventRow {
1186    // AppInfoBase
1187    app_version: String,
1188    major: Option<i32>,
1189    minor: Option<i32>,
1190    patch: Option<i32>,
1191    release_channel: String,
1192
1193    // ClientEventBase
1194    installation_id: Option<String>,
1195    // Note: This column name has a typo in the ClickHouse table.
1196    #[serde(rename = "sesssion_id")]
1197    session_id: Option<String>,
1198    is_staff: Option<bool>,
1199    time: i64,
1200    // ActionEventRow
1201    source: String,
1202    action: String,
1203}
1204
1205impl ActionEventRow {
1206    fn from_event(
1207        event: ActionEvent,
1208        wrapper: &EventWrapper,
1209        body: &EventRequestBody,
1210        first_event_at: chrono::DateTime<chrono::Utc>,
1211    ) -> Self {
1212        let semver = body.semver();
1213        let time =
1214            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1215
1216        Self {
1217            app_version: body.app_version.clone(),
1218            major: semver.map(|v| v.major() as i32),
1219            minor: semver.map(|v| v.minor() as i32),
1220            patch: semver.map(|v| v.patch() as i32),
1221            release_channel: body.release_channel.clone().unwrap_or_default(),
1222            installation_id: body.installation_id.clone(),
1223            session_id: body.session_id.clone(),
1224            is_staff: body.is_staff,
1225            time: time.timestamp_millis(),
1226            source: event.source,
1227            action: event.action,
1228        }
1229    }
1230}
1231
1232pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1233    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
1234        return None;
1235    };
1236
1237    let mut summer = Sha256::new();
1238    summer.update(checksum_seed);
1239    summer.update(&json);
1240    summer.update(checksum_seed);
1241    Some(summer.finalize().into_iter().collect())
1242}