events.rs

   1use super::ips_file::IpsFile;
   2use crate::{api::slack, AppState, Error, Result};
   3use anyhow::{anyhow, Context};
   4use aws_sdk_s3::primitives::ByteStream;
   5use axum::{
   6    body::Bytes,
   7    headers::Header,
   8    http::{HeaderMap, HeaderName, StatusCode},
   9    routing::post,
  10    Extension, Router, TypedHeader,
  11};
  12use rpc::ExtensionMetadata;
  13use semantic_version::SemanticVersion;
  14use serde::{Serialize, Serializer};
  15use sha2::{Digest, Sha256};
  16use std::sync::{Arc, OnceLock};
  17use telemetry_events::{
  18    ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
  19    EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent,
  20    SettingEvent,
  21};
  22use uuid::Uuid;
  23
  24static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  25
  26pub fn router() -> Router {
  27    Router::new()
  28        .route("/telemetry/events", post(post_events))
  29        .route("/telemetry/crashes", post(post_crash))
  30        .route("/telemetry/panics", post(post_panic))
  31        .route("/telemetry/hangs", post(post_hang))
  32}
  33
  34pub struct ZedChecksumHeader(Vec<u8>);
  35
  36impl Header for ZedChecksumHeader {
  37    fn name() -> &'static HeaderName {
  38        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  39        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  40    }
  41
  42    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  43    where
  44        Self: Sized,
  45        I: Iterator<Item = &'i axum::http::HeaderValue>,
  46    {
  47        let checksum = values
  48            .next()
  49            .ok_or_else(axum::headers::Error::invalid)?
  50            .to_str()
  51            .map_err(|_| axum::headers::Error::invalid())?;
  52
  53        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  54        Ok(Self(bytes))
  55    }
  56
  57    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  58        unimplemented!()
  59    }
  60}
  61
  62pub struct CloudflareIpCountryHeader(String);
  63
  64impl Header for CloudflareIpCountryHeader {
  65    fn name() -> &'static HeaderName {
  66        static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
  67        CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
  68    }
  69
  70    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  71    where
  72        Self: Sized,
  73        I: Iterator<Item = &'i axum::http::HeaderValue>,
  74    {
  75        let country_code = values
  76            .next()
  77            .ok_or_else(axum::headers::Error::invalid)?
  78            .to_str()
  79            .map_err(|_| axum::headers::Error::invalid())?;
  80
  81        Ok(Self(country_code.to_string()))
  82    }
  83
  84    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  85        unimplemented!()
  86    }
  87}
  88
  89pub async fn post_crash(
  90    Extension(app): Extension<Arc<AppState>>,
  91    headers: HeaderMap,
  92    body: Bytes,
  93) -> Result<()> {
  94    let report = IpsFile::parse(&body)?;
  95    let version_threshold = SemanticVersion::new(0, 123, 0);
  96
  97    let bundle_id = &report.header.bundle_id;
  98    let app_version = &report.app_version();
  99
 100    if bundle_id == "dev.zed.Zed-Dev" {
 101        log::error!("Crash uploads from {} are ignored.", bundle_id);
 102        return Ok(());
 103    }
 104
 105    if app_version.is_none() || app_version.unwrap() < version_threshold {
 106        log::error!(
 107            "Crash uploads from {} are ignored.",
 108            report.header.app_version
 109        );
 110        return Ok(());
 111    }
 112    let app_version = app_version.unwrap();
 113
 114    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 115        let response = blob_store_client
 116            .head_object()
 117            .bucket(CRASH_REPORTS_BUCKET)
 118            .key(report.header.incident_id.clone() + ".ips")
 119            .send()
 120            .await;
 121
 122        if response.is_ok() {
 123            log::info!("We've already uploaded this crash");
 124            return Ok(());
 125        }
 126
 127        blob_store_client
 128            .put_object()
 129            .bucket(CRASH_REPORTS_BUCKET)
 130            .key(report.header.incident_id.clone() + ".ips")
 131            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 132            .body(ByteStream::from(body.to_vec()))
 133            .send()
 134            .await
 135            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 136            .ok();
 137    }
 138
 139    let recent_panic_on: Option<i64> = headers
 140        .get("x-zed-panicked-on")
 141        .and_then(|h| h.to_str().ok())
 142        .and_then(|s| s.parse().ok());
 143
 144    let installation_id = headers
 145        .get("x-zed-installation-id")
 146        .and_then(|h| h.to_str().ok())
 147        .map(|s| s.to_string())
 148        .unwrap_or_default();
 149
 150    let mut recent_panic = None;
 151
 152    if let Some(recent_panic_on) = recent_panic_on {
 153        let crashed_at = match report.timestamp() {
 154            Ok(t) => Some(t),
 155            Err(e) => {
 156                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 157                None
 158            }
 159        };
 160        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 161            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 162        }
 163    }
 164
 165    let description = report.description(recent_panic);
 166    let summary = report.backtrace_summary();
 167
 168    tracing::error!(
 169        service = "client",
 170        version = %report.header.app_version,
 171        os_version = %report.header.os_version,
 172        bundle_id = %report.header.bundle_id,
 173        incident_id = %report.header.incident_id,
 174        installation_id = %installation_id,
 175        description = %description,
 176        backtrace = %summary,
 177        "crash report");
 178
 179    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 180        let payload = slack::WebhookBody::new(|w| {
 181            w.add_section(|s| s.text(slack::Text::markdown(description)))
 182                .add_section(|s| {
 183                    s.add_field(slack::Text::markdown(format!(
 184                        "*Version:*\n{} ({})",
 185                        bundle_id, app_version
 186                    )))
 187                    .add_field({
 188                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 189                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 190                            hostname.strip_prefix("http://").unwrap_or_default()
 191                        });
 192
 193                        slack::Text::markdown(format!(
 194                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 195                            CRASH_REPORTS_BUCKET,
 196                            hostname,
 197                            report.header.incident_id,
 198                            report
 199                                .header
 200                                .incident_id
 201                                .chars()
 202                                .take(8)
 203                                .collect::<String>(),
 204                        ))
 205                    })
 206                })
 207                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 208        });
 209        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 210            log::error!("Failed to serialize payload to JSON: {err}");
 211            Error::Internal(anyhow!(err))
 212        })?;
 213
 214        reqwest::Client::new()
 215            .post(slack_panics_webhook)
 216            .header("Content-Type", "application/json")
 217            .body(payload_json)
 218            .send()
 219            .await
 220            .map_err(|err| {
 221                log::error!("Failed to send payload to Slack: {err}");
 222                Error::Internal(anyhow!(err))
 223            })?;
 224    }
 225
 226    Ok(())
 227}
 228
 229pub async fn post_hang(
 230    Extension(app): Extension<Arc<AppState>>,
 231    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 232    body: Bytes,
 233) -> Result<()> {
 234    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 235        return Err(Error::Http(
 236            StatusCode::INTERNAL_SERVER_ERROR,
 237            "events not enabled".into(),
 238        ))?;
 239    };
 240
 241    if checksum != expected {
 242        return Err(Error::Http(
 243            StatusCode::BAD_REQUEST,
 244            "invalid checksum".into(),
 245        ))?;
 246    }
 247
 248    let incident_id = Uuid::new_v4().to_string();
 249
 250    // dump JSON into S3 so we can get frame offsets if we need to.
 251    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 252        blob_store_client
 253            .put_object()
 254            .bucket(CRASH_REPORTS_BUCKET)
 255            .key(incident_id.clone() + ".hang.json")
 256            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 257            .body(ByteStream::from(body.to_vec()))
 258            .send()
 259            .await
 260            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 261            .ok();
 262    }
 263
 264    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
 265        log::error!("can't parse report json: {err}");
 266        Error::Internal(anyhow!(err))
 267    })?;
 268
 269    let mut backtrace = "Possible hang detected on main thread:".to_string();
 270    let unknown = "<unknown>".to_string();
 271    for frame in report.backtrace.iter() {
 272        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
 273    }
 274
 275    tracing::error!(
 276        service = "client",
 277        version = %report.app_version.unwrap_or_default().to_string(),
 278        os_name = %report.os_name,
 279        os_version = report.os_version.unwrap_or_default().to_string(),
 280        incident_id = %incident_id,
 281        installation_id = %report.installation_id.unwrap_or_default(),
 282        backtrace = %backtrace,
 283        "hang report");
 284
 285    Ok(())
 286}
 287
 288pub async fn post_panic(
 289    Extension(app): Extension<Arc<AppState>>,
 290    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 291    body: Bytes,
 292) -> Result<()> {
 293    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 294        return Err(Error::Http(
 295            StatusCode::INTERNAL_SERVER_ERROR,
 296            "events not enabled".into(),
 297        ))?;
 298    };
 299
 300    if checksum != expected {
 301        return Err(Error::Http(
 302            StatusCode::BAD_REQUEST,
 303            "invalid checksum".into(),
 304        ))?;
 305    }
 306
 307    let report: telemetry_events::PanicRequest = serde_json::from_slice(&body)
 308        .map_err(|_| Error::Http(StatusCode::BAD_REQUEST, "invalid json".into()))?;
 309    let panic = report.panic;
 310
 311    tracing::error!(
 312        service = "client",
 313        version = %panic.app_version,
 314        os_name = %panic.os_name,
 315        os_version = %panic.os_version.clone().unwrap_or_default(),
 316        installation_id = %panic.installation_id.unwrap_or_default(),
 317        description = %panic.payload,
 318        backtrace = %panic.backtrace.join("\n"),
 319        "panic report");
 320
 321    let backtrace = if panic.backtrace.len() > 25 {
 322        let total = panic.backtrace.len();
 323        format!(
 324            "{}\n   and {} more",
 325            panic
 326                .backtrace
 327                .iter()
 328                .take(20)
 329                .cloned()
 330                .collect::<Vec<_>>()
 331                .join("\n"),
 332            total - 20
 333        )
 334    } else {
 335        panic.backtrace.join("\n")
 336    };
 337    let backtrace_with_summary = panic.payload + "\n" + &backtrace;
 338
 339    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 340        let payload = slack::WebhookBody::new(|w| {
 341            w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string())))
 342                .add_section(|s| {
 343                    s.add_field(slack::Text::markdown(format!(
 344                        "*Version:*\n {} ",
 345                        panic.app_version
 346                    )))
 347                    .add_field({
 348                        slack::Text::markdown(format!(
 349                            "*OS:*\n{} {}",
 350                            panic.os_name,
 351                            panic.os_version.unwrap_or_default()
 352                        ))
 353                    })
 354                })
 355                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary)))
 356        });
 357        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 358            log::error!("Failed to serialize payload to JSON: {err}");
 359            Error::Internal(anyhow!(err))
 360        })?;
 361
 362        reqwest::Client::new()
 363            .post(slack_panics_webhook)
 364            .header("Content-Type", "application/json")
 365            .body(payload_json)
 366            .send()
 367            .await
 368            .map_err(|err| {
 369                log::error!("Failed to send payload to Slack: {err}");
 370                Error::Internal(anyhow!(err))
 371            })?;
 372    }
 373
 374    Ok(())
 375}
 376
 377pub async fn post_events(
 378    Extension(app): Extension<Arc<AppState>>,
 379    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 380    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 381    body: Bytes,
 382) -> Result<()> {
 383    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 384        Err(Error::Http(
 385            StatusCode::NOT_IMPLEMENTED,
 386            "not supported".into(),
 387        ))?
 388    };
 389
 390    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 391        return Err(Error::Http(
 392            StatusCode::INTERNAL_SERVER_ERROR,
 393            "events not enabled".into(),
 394        ))?;
 395    };
 396
 397    if checksum != expected {
 398        return Err(Error::Http(
 399            StatusCode::BAD_REQUEST,
 400            "invalid checksum".into(),
 401        ))?;
 402    }
 403
 404    let request_body: telemetry_events::EventRequestBody =
 405        serde_json::from_slice(&body).map_err(|err| {
 406            log::error!("can't parse event json: {err}");
 407            Error::Internal(anyhow!(err))
 408        })?;
 409
 410    let mut to_upload = ToUpload::default();
 411    let Some(last_event) = request_body.events.last() else {
 412        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
 413    };
 414    let country_code = country_code_header.map(|h| h.0 .0);
 415
 416    let first_event_at = chrono::Utc::now()
 417        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 418
 419    for wrapper in &request_body.events {
 420        match &wrapper.event {
 421            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 422                event.clone(),
 423                &wrapper,
 424                &request_body,
 425                first_event_at,
 426                country_code.clone(),
 427            )),
 428            // Needed for clients sending old copilot_event types
 429            Event::Copilot(_) => {}
 430            Event::InlineCompletion(event) => {
 431                to_upload
 432                    .inline_completion_events
 433                    .push(InlineCompletionEventRow::from_event(
 434                        event.clone(),
 435                        &wrapper,
 436                        &request_body,
 437                        first_event_at,
 438                        country_code.clone(),
 439                    ))
 440            }
 441            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 442                event.clone(),
 443                &wrapper,
 444                &request_body,
 445                first_event_at,
 446            )),
 447            Event::Assistant(event) => {
 448                to_upload
 449                    .assistant_events
 450                    .push(AssistantEventRow::from_event(
 451                        event.clone(),
 452                        &wrapper,
 453                        &request_body,
 454                        first_event_at,
 455                    ))
 456            }
 457            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 458                event.clone(),
 459                &wrapper,
 460                &request_body,
 461                first_event_at,
 462            )),
 463            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 464                event.clone(),
 465                &wrapper,
 466                &request_body,
 467                first_event_at,
 468            )),
 469            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 470                event.clone(),
 471                &wrapper,
 472                &request_body,
 473                first_event_at,
 474            )),
 475            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 476                event.clone(),
 477                &wrapper,
 478                &request_body,
 479                first_event_at,
 480            )),
 481            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 482                event.clone(),
 483                &wrapper,
 484                &request_body,
 485                first_event_at,
 486            )),
 487            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 488                event.clone(),
 489                &wrapper,
 490                &request_body,
 491                first_event_at,
 492            )),
 493            Event::Extension(event) => {
 494                let metadata = app
 495                    .db
 496                    .get_extension_version(&event.extension_id, &event.version)
 497                    .await?;
 498                to_upload
 499                    .extension_events
 500                    .push(ExtensionEventRow::from_event(
 501                        event.clone(),
 502                        &wrapper,
 503                        &request_body,
 504                        metadata,
 505                        first_event_at,
 506                    ))
 507            }
 508        }
 509    }
 510
 511    to_upload
 512        .upload(&clickhouse_client)
 513        .await
 514        .map_err(|err| Error::Internal(anyhow!(err)))?;
 515
 516    Ok(())
 517}
 518
 519#[derive(Default)]
 520struct ToUpload {
 521    editor_events: Vec<EditorEventRow>,
 522    inline_completion_events: Vec<InlineCompletionEventRow>,
 523    assistant_events: Vec<AssistantEventRow>,
 524    call_events: Vec<CallEventRow>,
 525    cpu_events: Vec<CpuEventRow>,
 526    memory_events: Vec<MemoryEventRow>,
 527    app_events: Vec<AppEventRow>,
 528    setting_events: Vec<SettingEventRow>,
 529    extension_events: Vec<ExtensionEventRow>,
 530    edit_events: Vec<EditEventRow>,
 531    action_events: Vec<ActionEventRow>,
 532}
 533
 534impl ToUpload {
 535    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 536        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 537        Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 538            .await
 539            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 540
 541        const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events";
 542        Self::upload_to_table(
 543            INLINE_COMPLETION_EVENTS_TABLE,
 544            &self.inline_completion_events,
 545            clickhouse_client,
 546        )
 547        .await
 548        .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?;
 549
 550        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 551        Self::upload_to_table(
 552            ASSISTANT_EVENTS_TABLE,
 553            &self.assistant_events,
 554            clickhouse_client,
 555        )
 556        .await
 557        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 558
 559        const CALL_EVENTS_TABLE: &str = "call_events";
 560        Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 561            .await
 562            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 563
 564        const CPU_EVENTS_TABLE: &str = "cpu_events";
 565        Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 566            .await
 567            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 568
 569        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 570        Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 571            .await
 572            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 573
 574        const APP_EVENTS_TABLE: &str = "app_events";
 575        Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 576            .await
 577            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 578
 579        const SETTING_EVENTS_TABLE: &str = "setting_events";
 580        Self::upload_to_table(
 581            SETTING_EVENTS_TABLE,
 582            &self.setting_events,
 583            clickhouse_client,
 584        )
 585        .await
 586        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 587
 588        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 589        Self::upload_to_table(
 590            EXTENSION_EVENTS_TABLE,
 591            &self.extension_events,
 592            clickhouse_client,
 593        )
 594        .await
 595        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 596
 597        const EDIT_EVENTS_TABLE: &str = "edit_events";
 598        Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 599            .await
 600            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 601
 602        const ACTION_EVENTS_TABLE: &str = "action_events";
 603        Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 604            .await
 605            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 606
 607        Ok(())
 608    }
 609
 610    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
 611        table: &str,
 612        rows: &[T],
 613        clickhouse_client: &clickhouse::Client,
 614    ) -> anyhow::Result<()> {
 615        if !rows.is_empty() {
 616            let mut insert = clickhouse_client.insert(table)?;
 617
 618            for event in rows {
 619                insert.write(event).await?;
 620            }
 621
 622            insert.end().await?;
 623
 624            let event_count = rows.len();
 625            log::info!(
 626                "wrote {event_count} {event_specifier} to '{table}'",
 627                event_specifier = if event_count == 1 { "event" } else { "events" }
 628            );
 629        }
 630
 631        Ok(())
 632    }
 633}
 634
 635pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 636where
 637    S: Serializer,
 638{
 639    if country_code.len() != 2 {
 640        use serde::ser::Error;
 641        return Err(S::Error::custom(
 642            "country_code must be exactly 2 characters",
 643        ));
 644    }
 645
 646    let country_code = country_code.as_bytes();
 647
 648    serializer.serialize_u16(((country_code[1] as u16) << 8) + country_code[0] as u16)
 649}
 650
 651#[derive(Serialize, Debug, clickhouse::Row)]
 652pub struct EditorEventRow {
 653    pub installation_id: String,
 654    pub operation: String,
 655    pub app_version: String,
 656    pub file_extension: String,
 657    pub os_name: String,
 658    pub os_version: String,
 659    pub release_channel: String,
 660    pub signed_in: bool,
 661    pub vim_mode: bool,
 662    #[serde(serialize_with = "serialize_country_code")]
 663    pub country_code: String,
 664    pub region_code: String,
 665    pub city: String,
 666    pub time: i64,
 667    pub copilot_enabled: bool,
 668    pub copilot_enabled_for_language: bool,
 669    pub historical_event: bool,
 670    pub architecture: String,
 671    pub is_staff: Option<bool>,
 672    pub session_id: Option<String>,
 673    pub major: Option<i32>,
 674    pub minor: Option<i32>,
 675    pub patch: Option<i32>,
 676}
 677
 678impl EditorEventRow {
 679    fn from_event(
 680        event: EditorEvent,
 681        wrapper: &EventWrapper,
 682        body: &EventRequestBody,
 683        first_event_at: chrono::DateTime<chrono::Utc>,
 684        country_code: Option<String>,
 685    ) -> Self {
 686        let semver = body.semver();
 687        let time =
 688            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 689
 690        Self {
 691            app_version: body.app_version.clone(),
 692            major: semver.map(|v| v.major() as i32),
 693            minor: semver.map(|v| v.minor() as i32),
 694            patch: semver.map(|v| v.patch() as i32),
 695            release_channel: body.release_channel.clone().unwrap_or_default(),
 696            os_name: body.os_name.clone(),
 697            os_version: body.os_version.clone().unwrap_or_default(),
 698            architecture: body.architecture.clone(),
 699            installation_id: body.installation_id.clone().unwrap_or_default(),
 700            session_id: body.session_id.clone(),
 701            is_staff: body.is_staff,
 702            time: time.timestamp_millis(),
 703            operation: event.operation,
 704            file_extension: event.file_extension.unwrap_or_default(),
 705            signed_in: wrapper.signed_in,
 706            vim_mode: event.vim_mode,
 707            copilot_enabled: event.copilot_enabled,
 708            copilot_enabled_for_language: event.copilot_enabled_for_language,
 709            country_code: country_code.unwrap_or("XX".to_string()),
 710            region_code: "".to_string(),
 711            city: "".to_string(),
 712            historical_event: false,
 713        }
 714    }
 715}
 716
 717#[derive(Serialize, Debug, clickhouse::Row)]
 718pub struct InlineCompletionEventRow {
 719    pub installation_id: String,
 720    pub provider: String,
 721    pub suggestion_accepted: bool,
 722    pub app_version: String,
 723    pub file_extension: String,
 724    pub os_name: String,
 725    pub os_version: String,
 726    pub release_channel: String,
 727    pub signed_in: bool,
 728    #[serde(serialize_with = "serialize_country_code")]
 729    pub country_code: String,
 730    pub region_code: String,
 731    pub city: String,
 732    pub time: i64,
 733    pub is_staff: Option<bool>,
 734    pub session_id: Option<String>,
 735    pub major: Option<i32>,
 736    pub minor: Option<i32>,
 737    pub patch: Option<i32>,
 738}
 739
 740impl InlineCompletionEventRow {
 741    fn from_event(
 742        event: InlineCompletionEvent,
 743        wrapper: &EventWrapper,
 744        body: &EventRequestBody,
 745        first_event_at: chrono::DateTime<chrono::Utc>,
 746        country_code: Option<String>,
 747    ) -> Self {
 748        let semver = body.semver();
 749        let time =
 750            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 751
 752        Self {
 753            app_version: body.app_version.clone(),
 754            major: semver.map(|v| v.major() as i32),
 755            minor: semver.map(|v| v.minor() as i32),
 756            patch: semver.map(|v| v.patch() as i32),
 757            release_channel: body.release_channel.clone().unwrap_or_default(),
 758            os_name: body.os_name.clone(),
 759            os_version: body.os_version.clone().unwrap_or_default(),
 760            installation_id: body.installation_id.clone().unwrap_or_default(),
 761            session_id: body.session_id.clone(),
 762            is_staff: body.is_staff,
 763            time: time.timestamp_millis(),
 764            file_extension: event.file_extension.unwrap_or_default(),
 765            signed_in: wrapper.signed_in,
 766            country_code: country_code.unwrap_or("XX".to_string()),
 767            region_code: "".to_string(),
 768            city: "".to_string(),
 769            provider: event.provider,
 770            suggestion_accepted: event.suggestion_accepted,
 771        }
 772    }
 773}
 774
 775#[derive(Serialize, Debug, clickhouse::Row)]
 776pub struct CallEventRow {
 777    // AppInfoBase
 778    app_version: String,
 779    major: Option<i32>,
 780    minor: Option<i32>,
 781    patch: Option<i32>,
 782    release_channel: String,
 783
 784    // ClientEventBase
 785    installation_id: String,
 786    session_id: Option<String>,
 787    is_staff: Option<bool>,
 788    time: i64,
 789
 790    // CallEventRow
 791    operation: String,
 792    room_id: Option<u64>,
 793    channel_id: Option<u64>,
 794}
 795
 796impl CallEventRow {
 797    fn from_event(
 798        event: CallEvent,
 799        wrapper: &EventWrapper,
 800        body: &EventRequestBody,
 801        first_event_at: chrono::DateTime<chrono::Utc>,
 802    ) -> Self {
 803        let semver = body.semver();
 804        let time =
 805            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 806
 807        Self {
 808            app_version: body.app_version.clone(),
 809            major: semver.map(|v| v.major() as i32),
 810            minor: semver.map(|v| v.minor() as i32),
 811            patch: semver.map(|v| v.patch() as i32),
 812            release_channel: body.release_channel.clone().unwrap_or_default(),
 813            installation_id: body.installation_id.clone().unwrap_or_default(),
 814            session_id: body.session_id.clone(),
 815            is_staff: body.is_staff,
 816            time: time.timestamp_millis(),
 817            operation: event.operation,
 818            room_id: event.room_id,
 819            channel_id: event.channel_id,
 820        }
 821    }
 822}
 823
 824#[derive(Serialize, Debug, clickhouse::Row)]
 825pub struct AssistantEventRow {
 826    // AppInfoBase
 827    app_version: String,
 828    major: Option<i32>,
 829    minor: Option<i32>,
 830    patch: Option<i32>,
 831    release_channel: String,
 832
 833    // ClientEventBase
 834    installation_id: Option<String>,
 835    session_id: Option<String>,
 836    is_staff: Option<bool>,
 837    time: i64,
 838
 839    // AssistantEventRow
 840    conversation_id: String,
 841    kind: String,
 842    model: String,
 843    response_latency_in_ms: Option<i64>,
 844    error_message: Option<String>,
 845}
 846
 847impl AssistantEventRow {
 848    fn from_event(
 849        event: AssistantEvent,
 850        wrapper: &EventWrapper,
 851        body: &EventRequestBody,
 852        first_event_at: chrono::DateTime<chrono::Utc>,
 853    ) -> Self {
 854        let semver = body.semver();
 855        let time =
 856            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 857
 858        Self {
 859            app_version: body.app_version.clone(),
 860            major: semver.map(|v| v.major() as i32),
 861            minor: semver.map(|v| v.minor() as i32),
 862            patch: semver.map(|v| v.patch() as i32),
 863            release_channel: body.release_channel.clone().unwrap_or_default(),
 864            installation_id: body.installation_id.clone(),
 865            session_id: body.session_id.clone(),
 866            is_staff: body.is_staff,
 867            time: time.timestamp_millis(),
 868            conversation_id: event.conversation_id.unwrap_or_default(),
 869            kind: event.kind.to_string(),
 870            model: event.model,
 871            response_latency_in_ms: event
 872                .response_latency
 873                .map(|latency| latency.as_millis() as i64),
 874            error_message: event.error_message,
 875        }
 876    }
 877}
 878
 879#[derive(Debug, clickhouse::Row, Serialize)]
 880pub struct CpuEventRow {
 881    pub installation_id: Option<String>,
 882    pub is_staff: Option<bool>,
 883    pub usage_as_percentage: f32,
 884    pub core_count: u32,
 885    pub app_version: String,
 886    pub release_channel: String,
 887    pub time: i64,
 888    pub session_id: Option<String>,
 889    // pub normalized_cpu_usage: f64, MATERIALIZED
 890    pub major: Option<i32>,
 891    pub minor: Option<i32>,
 892    pub patch: Option<i32>,
 893}
 894
 895impl CpuEventRow {
 896    fn from_event(
 897        event: CpuEvent,
 898        wrapper: &EventWrapper,
 899        body: &EventRequestBody,
 900        first_event_at: chrono::DateTime<chrono::Utc>,
 901    ) -> Self {
 902        let semver = body.semver();
 903        let time =
 904            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 905
 906        Self {
 907            app_version: body.app_version.clone(),
 908            major: semver.map(|v| v.major() as i32),
 909            minor: semver.map(|v| v.minor() as i32),
 910            patch: semver.map(|v| v.patch() as i32),
 911            release_channel: body.release_channel.clone().unwrap_or_default(),
 912            installation_id: body.installation_id.clone(),
 913            session_id: body.session_id.clone(),
 914            is_staff: body.is_staff,
 915            time: time.timestamp_millis(),
 916            usage_as_percentage: event.usage_as_percentage,
 917            core_count: event.core_count,
 918        }
 919    }
 920}
 921
 922#[derive(Serialize, Debug, clickhouse::Row)]
 923pub struct MemoryEventRow {
 924    // AppInfoBase
 925    app_version: String,
 926    major: Option<i32>,
 927    minor: Option<i32>,
 928    patch: Option<i32>,
 929    release_channel: String,
 930
 931    // ClientEventBase
 932    installation_id: Option<String>,
 933    session_id: Option<String>,
 934    is_staff: Option<bool>,
 935    time: i64,
 936
 937    // MemoryEventRow
 938    memory_in_bytes: u64,
 939    virtual_memory_in_bytes: u64,
 940}
 941
 942impl MemoryEventRow {
 943    fn from_event(
 944        event: MemoryEvent,
 945        wrapper: &EventWrapper,
 946        body: &EventRequestBody,
 947        first_event_at: chrono::DateTime<chrono::Utc>,
 948    ) -> Self {
 949        let semver = body.semver();
 950        let time =
 951            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 952
 953        Self {
 954            app_version: body.app_version.clone(),
 955            major: semver.map(|v| v.major() as i32),
 956            minor: semver.map(|v| v.minor() as i32),
 957            patch: semver.map(|v| v.patch() as i32),
 958            release_channel: body.release_channel.clone().unwrap_or_default(),
 959            installation_id: body.installation_id.clone(),
 960            session_id: body.session_id.clone(),
 961            is_staff: body.is_staff,
 962            time: time.timestamp_millis(),
 963            memory_in_bytes: event.memory_in_bytes,
 964            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
 965        }
 966    }
 967}
 968
 969#[derive(Serialize, Debug, clickhouse::Row)]
 970pub struct AppEventRow {
 971    // AppInfoBase
 972    app_version: String,
 973    major: Option<i32>,
 974    minor: Option<i32>,
 975    patch: Option<i32>,
 976    release_channel: String,
 977
 978    // ClientEventBase
 979    installation_id: Option<String>,
 980    session_id: Option<String>,
 981    is_staff: Option<bool>,
 982    time: i64,
 983
 984    // AppEventRow
 985    operation: String,
 986}
 987
 988impl AppEventRow {
 989    fn from_event(
 990        event: AppEvent,
 991        wrapper: &EventWrapper,
 992        body: &EventRequestBody,
 993        first_event_at: chrono::DateTime<chrono::Utc>,
 994    ) -> Self {
 995        let semver = body.semver();
 996        let time =
 997            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 998
 999        Self {
1000            app_version: body.app_version.clone(),
1001            major: semver.map(|v| v.major() as i32),
1002            minor: semver.map(|v| v.minor() as i32),
1003            patch: semver.map(|v| v.patch() as i32),
1004            release_channel: body.release_channel.clone().unwrap_or_default(),
1005            installation_id: body.installation_id.clone(),
1006            session_id: body.session_id.clone(),
1007            is_staff: body.is_staff,
1008            time: time.timestamp_millis(),
1009            operation: event.operation,
1010        }
1011    }
1012}
1013
1014#[derive(Serialize, Debug, clickhouse::Row)]
1015pub struct SettingEventRow {
1016    // AppInfoBase
1017    app_version: String,
1018    major: Option<i32>,
1019    minor: Option<i32>,
1020    patch: Option<i32>,
1021    release_channel: String,
1022
1023    // ClientEventBase
1024    installation_id: Option<String>,
1025    session_id: Option<String>,
1026    is_staff: Option<bool>,
1027    time: i64,
1028    // SettingEventRow
1029    setting: String,
1030    value: String,
1031}
1032
1033impl SettingEventRow {
1034    fn from_event(
1035        event: SettingEvent,
1036        wrapper: &EventWrapper,
1037        body: &EventRequestBody,
1038        first_event_at: chrono::DateTime<chrono::Utc>,
1039    ) -> Self {
1040        let semver = body.semver();
1041        let time =
1042            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1043
1044        Self {
1045            app_version: body.app_version.clone(),
1046            major: semver.map(|v| v.major() as i32),
1047            minor: semver.map(|v| v.minor() as i32),
1048            patch: semver.map(|v| v.patch() as i32),
1049            release_channel: body.release_channel.clone().unwrap_or_default(),
1050            installation_id: body.installation_id.clone(),
1051            session_id: body.session_id.clone(),
1052            is_staff: body.is_staff,
1053            time: time.timestamp_millis(),
1054            setting: event.setting,
1055            value: event.value,
1056        }
1057    }
1058}
1059
1060#[derive(Serialize, Debug, clickhouse::Row)]
1061pub struct ExtensionEventRow {
1062    // AppInfoBase
1063    app_version: String,
1064    major: Option<i32>,
1065    minor: Option<i32>,
1066    patch: Option<i32>,
1067    release_channel: String,
1068
1069    // ClientEventBase
1070    installation_id: Option<String>,
1071    session_id: Option<String>,
1072    is_staff: Option<bool>,
1073    time: i64,
1074
1075    // ExtensionEventRow
1076    extension_id: Arc<str>,
1077    extension_version: Arc<str>,
1078    dev: bool,
1079    schema_version: Option<i32>,
1080    wasm_api_version: Option<String>,
1081}
1082
1083impl ExtensionEventRow {
1084    fn from_event(
1085        event: ExtensionEvent,
1086        wrapper: &EventWrapper,
1087        body: &EventRequestBody,
1088        extension_metadata: Option<ExtensionMetadata>,
1089        first_event_at: chrono::DateTime<chrono::Utc>,
1090    ) -> Self {
1091        let semver = body.semver();
1092        let time =
1093            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1094
1095        Self {
1096            app_version: body.app_version.clone(),
1097            major: semver.map(|v| v.major() as i32),
1098            minor: semver.map(|v| v.minor() as i32),
1099            patch: semver.map(|v| v.patch() as i32),
1100            release_channel: body.release_channel.clone().unwrap_or_default(),
1101            installation_id: body.installation_id.clone(),
1102            session_id: body.session_id.clone(),
1103            is_staff: body.is_staff,
1104            time: time.timestamp_millis(),
1105            extension_id: event.extension_id,
1106            extension_version: event.version,
1107            dev: extension_metadata.is_none(),
1108            schema_version: extension_metadata
1109                .as_ref()
1110                .and_then(|metadata| metadata.manifest.schema_version),
1111            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1112                metadata
1113                    .manifest
1114                    .wasm_api_version
1115                    .as_ref()
1116                    .map(|version| version.to_string())
1117            }),
1118        }
1119    }
1120}
1121
1122#[derive(Serialize, Debug, clickhouse::Row)]
1123pub struct EditEventRow {
1124    // AppInfoBase
1125    app_version: String,
1126    major: Option<i32>,
1127    minor: Option<i32>,
1128    patch: Option<i32>,
1129    release_channel: String,
1130
1131    // ClientEventBase
1132    installation_id: Option<String>,
1133    // Note: This column name has a typo in the ClickHouse table.
1134    #[serde(rename = "sesssion_id")]
1135    session_id: Option<String>,
1136    is_staff: Option<bool>,
1137    time: i64,
1138
1139    // EditEventRow
1140    period_start: i64,
1141    period_end: i64,
1142    environment: String,
1143}
1144
1145impl EditEventRow {
1146    fn from_event(
1147        event: EditEvent,
1148        wrapper: &EventWrapper,
1149        body: &EventRequestBody,
1150        first_event_at: chrono::DateTime<chrono::Utc>,
1151    ) -> Self {
1152        let semver = body.semver();
1153        let time =
1154            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1155
1156        let period_start = time - chrono::Duration::milliseconds(event.duration);
1157        let period_end = time;
1158
1159        Self {
1160            app_version: body.app_version.clone(),
1161            major: semver.map(|v| v.major() as i32),
1162            minor: semver.map(|v| v.minor() as i32),
1163            patch: semver.map(|v| v.patch() as i32),
1164            release_channel: body.release_channel.clone().unwrap_or_default(),
1165            installation_id: body.installation_id.clone(),
1166            session_id: body.session_id.clone(),
1167            is_staff: body.is_staff,
1168            time: time.timestamp_millis(),
1169            period_start: period_start.timestamp_millis(),
1170            period_end: period_end.timestamp_millis(),
1171            environment: event.environment,
1172        }
1173    }
1174}
1175
1176#[derive(Serialize, Debug, clickhouse::Row)]
1177pub struct ActionEventRow {
1178    // AppInfoBase
1179    app_version: String,
1180    major: Option<i32>,
1181    minor: Option<i32>,
1182    patch: Option<i32>,
1183    release_channel: String,
1184
1185    // ClientEventBase
1186    installation_id: Option<String>,
1187    // Note: This column name has a typo in the ClickHouse table.
1188    #[serde(rename = "sesssion_id")]
1189    session_id: Option<String>,
1190    is_staff: Option<bool>,
1191    time: i64,
1192    // ActionEventRow
1193    source: String,
1194    action: String,
1195}
1196
1197impl ActionEventRow {
1198    fn from_event(
1199        event: ActionEvent,
1200        wrapper: &EventWrapper,
1201        body: &EventRequestBody,
1202        first_event_at: chrono::DateTime<chrono::Utc>,
1203    ) -> Self {
1204        let semver = body.semver();
1205        let time =
1206            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1207
1208        Self {
1209            app_version: body.app_version.clone(),
1210            major: semver.map(|v| v.major() as i32),
1211            minor: semver.map(|v| v.minor() as i32),
1212            patch: semver.map(|v| v.patch() as i32),
1213            release_channel: body.release_channel.clone().unwrap_or_default(),
1214            installation_id: body.installation_id.clone(),
1215            session_id: body.session_id.clone(),
1216            is_staff: body.is_staff,
1217            time: time.timestamp_millis(),
1218            source: event.source,
1219            action: event.action,
1220        }
1221    }
1222}
1223
1224pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1225    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
1226        return None;
1227    };
1228
1229    let mut summer = Sha256::new();
1230    summer.update(checksum_seed);
1231    summer.update(&json);
1232    summer.update(checksum_seed);
1233    Some(summer.finalize().into_iter().collect())
1234}