events.rs

   1use super::ips_file::IpsFile;
   2use crate::{api::slack, AppState, Error, Result};
   3use anyhow::{anyhow, Context};
   4use aws_sdk_s3::primitives::ByteStream;
   5use axum::{
   6    body::Bytes,
   7    headers::Header,
   8    http::{HeaderMap, HeaderName, StatusCode},
   9    routing::post,
  10    Extension, Router, TypedHeader,
  11};
  12use rpc::ExtensionMetadata;
  13use semantic_version::SemanticVersion;
  14use serde::{Serialize, Serializer};
  15use sha2::{Digest, Sha256};
  16use std::sync::{Arc, OnceLock};
  17use telemetry_events::{
  18    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
  19    EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent, SettingEvent,
  20};
  21use uuid::Uuid;
  22
  23static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  24
  25pub fn router() -> Router {
  26    Router::new()
  27        .route("/telemetry/events", post(post_events))
  28        .route("/telemetry/crashes", post(post_crash))
  29        .route("/telemetry/hangs", post(post_hang))
  30}
  31
  32pub struct ZedChecksumHeader(Vec<u8>);
  33
  34impl Header for ZedChecksumHeader {
  35    fn name() -> &'static HeaderName {
  36        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  37        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  38    }
  39
  40    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  41    where
  42        Self: Sized,
  43        I: Iterator<Item = &'i axum::http::HeaderValue>,
  44    {
  45        let checksum = values
  46            .next()
  47            .ok_or_else(axum::headers::Error::invalid)?
  48            .to_str()
  49            .map_err(|_| axum::headers::Error::invalid())?;
  50
  51        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  52        Ok(Self(bytes))
  53    }
  54
  55    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  56        unimplemented!()
  57    }
  58}
  59
  60pub struct CloudflareIpCountryHeader(String);
  61
  62impl Header for CloudflareIpCountryHeader {
  63    fn name() -> &'static HeaderName {
  64        static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
  65        CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
  66    }
  67
  68    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  69    where
  70        Self: Sized,
  71        I: Iterator<Item = &'i axum::http::HeaderValue>,
  72    {
  73        let country_code = values
  74            .next()
  75            .ok_or_else(axum::headers::Error::invalid)?
  76            .to_str()
  77            .map_err(|_| axum::headers::Error::invalid())?;
  78
  79        Ok(Self(country_code.to_string()))
  80    }
  81
  82    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  83        unimplemented!()
  84    }
  85}
  86
  87pub async fn post_crash(
  88    Extension(app): Extension<Arc<AppState>>,
  89    headers: HeaderMap,
  90    body: Bytes,
  91) -> Result<()> {
  92    let report = IpsFile::parse(&body)?;
  93    let version_threshold = SemanticVersion::new(0, 123, 0);
  94
  95    let bundle_id = &report.header.bundle_id;
  96    let app_version = &report.app_version();
  97
  98    if bundle_id == "dev.zed.Zed-Dev" {
  99        log::error!("Crash uploads from {} are ignored.", bundle_id);
 100        return Ok(());
 101    }
 102
 103    if app_version.is_none() || app_version.unwrap() < version_threshold {
 104        log::error!(
 105            "Crash uploads from {} are ignored.",
 106            report.header.app_version
 107        );
 108        return Ok(());
 109    }
 110    let app_version = app_version.unwrap();
 111
 112    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 113        let response = blob_store_client
 114            .head_object()
 115            .bucket(CRASH_REPORTS_BUCKET)
 116            .key(report.header.incident_id.clone() + ".ips")
 117            .send()
 118            .await;
 119
 120        if response.is_ok() {
 121            log::info!("We've already uploaded this crash");
 122            return Ok(());
 123        }
 124
 125        blob_store_client
 126            .put_object()
 127            .bucket(CRASH_REPORTS_BUCKET)
 128            .key(report.header.incident_id.clone() + ".ips")
 129            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 130            .body(ByteStream::from(body.to_vec()))
 131            .send()
 132            .await
 133            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 134            .ok();
 135    }
 136
 137    let recent_panic_on: Option<i64> = headers
 138        .get("x-zed-panicked-on")
 139        .and_then(|h| h.to_str().ok())
 140        .and_then(|s| s.parse().ok());
 141
 142    let installation_id = headers
 143        .get("x-zed-installation-id")
 144        .and_then(|h| h.to_str().ok())
 145        .map(|s| s.to_string())
 146        .unwrap_or_default();
 147
 148    let mut recent_panic = None;
 149
 150    if let Some(recent_panic_on) = recent_panic_on {
 151        let crashed_at = match report.timestamp() {
 152            Ok(t) => Some(t),
 153            Err(e) => {
 154                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 155                None
 156            }
 157        };
 158        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 159            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 160        }
 161    }
 162
 163    let description = report.description(recent_panic);
 164    let summary = report.backtrace_summary();
 165
 166    tracing::error!(
 167        service = "client",
 168        version = %report.header.app_version,
 169        os_version = %report.header.os_version,
 170        bundle_id = %report.header.bundle_id,
 171        incident_id = %report.header.incident_id,
 172        installation_id = %installation_id,
 173        description = %description,
 174        backtrace = %summary,
 175        "crash report");
 176
 177    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 178        let payload = slack::WebhookBody::new(|w| {
 179            w.add_section(|s| s.text(slack::Text::markdown(description)))
 180                .add_section(|s| {
 181                    s.add_field(slack::Text::markdown(format!(
 182                        "*Version:*\n{} ({})",
 183                        bundle_id, app_version
 184                    )))
 185                    .add_field({
 186                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 187                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 188                            hostname.strip_prefix("http://").unwrap_or_default()
 189                        });
 190
 191                        slack::Text::markdown(format!(
 192                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 193                            CRASH_REPORTS_BUCKET,
 194                            hostname,
 195                            report.header.incident_id,
 196                            report
 197                                .header
 198                                .incident_id
 199                                .chars()
 200                                .take(8)
 201                                .collect::<String>(),
 202                        ))
 203                    })
 204                })
 205                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 206        });
 207        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 208            log::error!("Failed to serialize payload to JSON: {err}");
 209            Error::Internal(anyhow!(err))
 210        })?;
 211
 212        reqwest::Client::new()
 213            .post(slack_panics_webhook)
 214            .header("Content-Type", "application/json")
 215            .body(payload_json)
 216            .send()
 217            .await
 218            .map_err(|err| {
 219                log::error!("Failed to send payload to Slack: {err}");
 220                Error::Internal(anyhow!(err))
 221            })?;
 222    }
 223
 224    Ok(())
 225}
 226
 227pub async fn post_hang(
 228    Extension(app): Extension<Arc<AppState>>,
 229    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 230    body: Bytes,
 231) -> Result<()> {
 232    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 233        return Err(Error::Http(
 234            StatusCode::INTERNAL_SERVER_ERROR,
 235            "events not enabled".into(),
 236        ))?;
 237    };
 238
 239    if checksum != expected {
 240        return Err(Error::Http(
 241            StatusCode::BAD_REQUEST,
 242            "invalid checksum".into(),
 243        ))?;
 244    }
 245
 246    let incident_id = Uuid::new_v4().to_string();
 247
 248    // dump JSON into S3 so we can get frame offsets if we need to.
 249    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 250        blob_store_client
 251            .put_object()
 252            .bucket(CRASH_REPORTS_BUCKET)
 253            .key(incident_id.clone() + ".hang.json")
 254            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 255            .body(ByteStream::from(body.to_vec()))
 256            .send()
 257            .await
 258            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 259            .ok();
 260    }
 261
 262    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
 263        log::error!("can't parse report json: {err}");
 264        Error::Internal(anyhow!(err))
 265    })?;
 266
 267    let mut backtrace = "Possible hang detected on main thread:".to_string();
 268    let unknown = "<unknown>".to_string();
 269    for frame in report.backtrace.iter() {
 270        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
 271    }
 272
 273    tracing::error!(
 274        service = "client",
 275        version = %report.app_version.unwrap_or_default().to_string(),
 276        os_name = %report.os_name,
 277        os_version = report.os_version.unwrap_or_default().to_string(),
 278        incident_id = %incident_id,
 279        installation_id = %report.installation_id.unwrap_or_default(),
 280        backtrace = %backtrace,
 281        "hang report");
 282
 283    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 284        let payload = slack::WebhookBody::new(|w| {
 285            w.add_section(|s| s.text(slack::Text::markdown("Possible Hang".to_string())))
 286                .add_section(|s| {
 287                    s.add_field(slack::Text::markdown(format!(
 288                        "*Version:*\n {} ",
 289                        report.app_version.unwrap_or_default()
 290                    )))
 291                    .add_field({
 292                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 293                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 294                            hostname.strip_prefix("http://").unwrap_or_default()
 295                        });
 296
 297                        slack::Text::markdown(format!(
 298                            "*Incident:*\n<https://{}.{}/{}.hang.json|{}…>",
 299                            CRASH_REPORTS_BUCKET,
 300                            hostname,
 301                            incident_id,
 302                            incident_id.chars().take(8).collect::<String>(),
 303                        ))
 304                    })
 305                })
 306                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace)))
 307        });
 308        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 309            log::error!("Failed to serialize payload to JSON: {err}");
 310            Error::Internal(anyhow!(err))
 311        })?;
 312
 313        reqwest::Client::new()
 314            .post(slack_panics_webhook)
 315            .header("Content-Type", "application/json")
 316            .body(payload_json)
 317            .send()
 318            .await
 319            .map_err(|err| {
 320                log::error!("Failed to send payload to Slack: {err}");
 321                Error::Internal(anyhow!(err))
 322            })?;
 323    }
 324
 325    Ok(())
 326}
 327
 328pub async fn post_events(
 329    Extension(app): Extension<Arc<AppState>>,
 330    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 331    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 332    body: Bytes,
 333) -> Result<()> {
 334    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 335        Err(Error::Http(
 336            StatusCode::NOT_IMPLEMENTED,
 337            "not supported".into(),
 338        ))?
 339    };
 340
 341    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 342        return Err(Error::Http(
 343            StatusCode::INTERNAL_SERVER_ERROR,
 344            "events not enabled".into(),
 345        ))?;
 346    };
 347
 348    if checksum != expected {
 349        return Err(Error::Http(
 350            StatusCode::BAD_REQUEST,
 351            "invalid checksum".into(),
 352        ))?;
 353    }
 354
 355    let request_body: telemetry_events::EventRequestBody =
 356        serde_json::from_slice(&body).map_err(|err| {
 357            log::error!("can't parse event json: {err}");
 358            Error::Internal(anyhow!(err))
 359        })?;
 360
 361    let mut to_upload = ToUpload::default();
 362    let Some(last_event) = request_body.events.last() else {
 363        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
 364    };
 365    let country_code = country_code_header.map(|h| h.0 .0);
 366
 367    let first_event_at = chrono::Utc::now()
 368        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 369
 370    for wrapper in &request_body.events {
 371        match &wrapper.event {
 372            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 373                event.clone(),
 374                &wrapper,
 375                &request_body,
 376                first_event_at,
 377                country_code.clone(),
 378            )),
 379            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
 380                event.clone(),
 381                &wrapper,
 382                &request_body,
 383                first_event_at,
 384                country_code.clone(),
 385            )),
 386            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 387                event.clone(),
 388                &wrapper,
 389                &request_body,
 390                first_event_at,
 391            )),
 392            Event::Assistant(event) => {
 393                to_upload
 394                    .assistant_events
 395                    .push(AssistantEventRow::from_event(
 396                        event.clone(),
 397                        &wrapper,
 398                        &request_body,
 399                        first_event_at,
 400                    ))
 401            }
 402            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 403                event.clone(),
 404                &wrapper,
 405                &request_body,
 406                first_event_at,
 407            )),
 408            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 409                event.clone(),
 410                &wrapper,
 411                &request_body,
 412                first_event_at,
 413            )),
 414            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 415                event.clone(),
 416                &wrapper,
 417                &request_body,
 418                first_event_at,
 419            )),
 420            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 421                event.clone(),
 422                &wrapper,
 423                &request_body,
 424                first_event_at,
 425            )),
 426            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 427                event.clone(),
 428                &wrapper,
 429                &request_body,
 430                first_event_at,
 431            )),
 432            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 433                event.clone(),
 434                &wrapper,
 435                &request_body,
 436                first_event_at,
 437            )),
 438            Event::Extension(event) => {
 439                let metadata = app
 440                    .db
 441                    .get_extension_version(&event.extension_id, &event.version)
 442                    .await?;
 443                to_upload
 444                    .extension_events
 445                    .push(ExtensionEventRow::from_event(
 446                        event.clone(),
 447                        &wrapper,
 448                        &request_body,
 449                        metadata,
 450                        first_event_at,
 451                    ))
 452            }
 453        }
 454    }
 455
 456    to_upload
 457        .upload(&clickhouse_client)
 458        .await
 459        .map_err(|err| Error::Internal(anyhow!(err)))?;
 460
 461    Ok(())
 462}
 463
 464#[derive(Default)]
 465struct ToUpload {
 466    editor_events: Vec<EditorEventRow>,
 467    copilot_events: Vec<CopilotEventRow>,
 468    assistant_events: Vec<AssistantEventRow>,
 469    call_events: Vec<CallEventRow>,
 470    cpu_events: Vec<CpuEventRow>,
 471    memory_events: Vec<MemoryEventRow>,
 472    app_events: Vec<AppEventRow>,
 473    setting_events: Vec<SettingEventRow>,
 474    extension_events: Vec<ExtensionEventRow>,
 475    edit_events: Vec<EditEventRow>,
 476    action_events: Vec<ActionEventRow>,
 477}
 478
 479impl ToUpload {
 480    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 481        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 482        Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 483            .await
 484            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 485
 486        const COPILOT_EVENTS_TABLE: &str = "copilot_events";
 487        Self::upload_to_table(
 488            COPILOT_EVENTS_TABLE,
 489            &self.copilot_events,
 490            clickhouse_client,
 491        )
 492        .await
 493        .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
 494
 495        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 496        Self::upload_to_table(
 497            ASSISTANT_EVENTS_TABLE,
 498            &self.assistant_events,
 499            clickhouse_client,
 500        )
 501        .await
 502        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 503
 504        const CALL_EVENTS_TABLE: &str = "call_events";
 505        Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 506            .await
 507            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 508
 509        const CPU_EVENTS_TABLE: &str = "cpu_events";
 510        Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 511            .await
 512            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 513
 514        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 515        Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 516            .await
 517            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 518
 519        const APP_EVENTS_TABLE: &str = "app_events";
 520        Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 521            .await
 522            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 523
 524        const SETTING_EVENTS_TABLE: &str = "setting_events";
 525        Self::upload_to_table(
 526            SETTING_EVENTS_TABLE,
 527            &self.setting_events,
 528            clickhouse_client,
 529        )
 530        .await
 531        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 532
 533        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 534        Self::upload_to_table(
 535            EXTENSION_EVENTS_TABLE,
 536            &self.extension_events,
 537            clickhouse_client,
 538        )
 539        .await
 540        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 541
 542        const EDIT_EVENTS_TABLE: &str = "edit_events";
 543        Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 544            .await
 545            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 546
 547        const ACTION_EVENTS_TABLE: &str = "action_events";
 548        Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 549            .await
 550            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 551
 552        Ok(())
 553    }
 554
 555    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
 556        table: &str,
 557        rows: &[T],
 558        clickhouse_client: &clickhouse::Client,
 559    ) -> anyhow::Result<()> {
 560        if !rows.is_empty() {
 561            let mut insert = clickhouse_client.insert(table)?;
 562
 563            for event in rows {
 564                insert.write(event).await?;
 565            }
 566
 567            insert.end().await?;
 568
 569            let event_count = rows.len();
 570            log::info!(
 571                "wrote {event_count} {event_specifier} to '{table}'",
 572                event_specifier = if event_count == 1 { "event" } else { "events" }
 573            );
 574        }
 575
 576        Ok(())
 577    }
 578}
 579
 580pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 581where
 582    S: Serializer,
 583{
 584    if country_code.len() != 2 {
 585        use serde::ser::Error;
 586        return Err(S::Error::custom(
 587            "country_code must be exactly 2 characters",
 588        ));
 589    }
 590
 591    let country_code = country_code.as_bytes();
 592
 593    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
 594}
 595
 596#[derive(Serialize, Debug, clickhouse::Row)]
 597pub struct EditorEventRow {
 598    pub installation_id: String,
 599    pub operation: String,
 600    pub app_version: String,
 601    pub file_extension: String,
 602    pub os_name: String,
 603    pub os_version: String,
 604    pub release_channel: String,
 605    pub signed_in: bool,
 606    pub vim_mode: bool,
 607    #[serde(serialize_with = "serialize_country_code")]
 608    pub country_code: String,
 609    pub region_code: String,
 610    pub city: String,
 611    pub time: i64,
 612    pub copilot_enabled: bool,
 613    pub copilot_enabled_for_language: bool,
 614    pub historical_event: bool,
 615    pub architecture: String,
 616    pub is_staff: Option<bool>,
 617    pub session_id: Option<String>,
 618    pub major: Option<i32>,
 619    pub minor: Option<i32>,
 620    pub patch: Option<i32>,
 621}
 622
 623impl EditorEventRow {
 624    fn from_event(
 625        event: EditorEvent,
 626        wrapper: &EventWrapper,
 627        body: &EventRequestBody,
 628        first_event_at: chrono::DateTime<chrono::Utc>,
 629        country_code: Option<String>,
 630    ) -> Self {
 631        let semver = body.semver();
 632        let time =
 633            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 634
 635        Self {
 636            app_version: body.app_version.clone(),
 637            major: semver.map(|v| v.major() as i32),
 638            minor: semver.map(|v| v.minor() as i32),
 639            patch: semver.map(|v| v.patch() as i32),
 640            release_channel: body.release_channel.clone().unwrap_or_default(),
 641            os_name: body.os_name.clone(),
 642            os_version: body.os_version.clone().unwrap_or_default(),
 643            architecture: body.architecture.clone(),
 644            installation_id: body.installation_id.clone().unwrap_or_default(),
 645            session_id: body.session_id.clone(),
 646            is_staff: body.is_staff,
 647            time: time.timestamp_millis(),
 648            operation: event.operation,
 649            file_extension: event.file_extension.unwrap_or_default(),
 650            signed_in: wrapper.signed_in,
 651            vim_mode: event.vim_mode,
 652            copilot_enabled: event.copilot_enabled,
 653            copilot_enabled_for_language: event.copilot_enabled_for_language,
 654            country_code: country_code.unwrap_or("XX".to_string()),
 655            region_code: "".to_string(),
 656            city: "".to_string(),
 657            historical_event: false,
 658        }
 659    }
 660}
 661
 662#[derive(Serialize, Debug, clickhouse::Row)]
 663pub struct CopilotEventRow {
 664    pub installation_id: String,
 665    pub suggestion_id: String,
 666    pub suggestion_accepted: bool,
 667    pub app_version: String,
 668    pub file_extension: String,
 669    pub os_name: String,
 670    pub os_version: String,
 671    pub release_channel: String,
 672    pub signed_in: bool,
 673    #[serde(serialize_with = "serialize_country_code")]
 674    pub country_code: String,
 675    pub region_code: String,
 676    pub city: String,
 677    pub time: i64,
 678    pub is_staff: Option<bool>,
 679    pub session_id: Option<String>,
 680    pub major: Option<i32>,
 681    pub minor: Option<i32>,
 682    pub patch: Option<i32>,
 683}
 684
 685impl CopilotEventRow {
 686    fn from_event(
 687        event: CopilotEvent,
 688        wrapper: &EventWrapper,
 689        body: &EventRequestBody,
 690        first_event_at: chrono::DateTime<chrono::Utc>,
 691        country_code: Option<String>,
 692    ) -> Self {
 693        let semver = body.semver();
 694        let time =
 695            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 696
 697        Self {
 698            app_version: body.app_version.clone(),
 699            major: semver.map(|v| v.major() as i32),
 700            minor: semver.map(|v| v.minor() as i32),
 701            patch: semver.map(|v| v.patch() as i32),
 702            release_channel: body.release_channel.clone().unwrap_or_default(),
 703            os_name: body.os_name.clone(),
 704            os_version: body.os_version.clone().unwrap_or_default(),
 705            installation_id: body.installation_id.clone().unwrap_or_default(),
 706            session_id: body.session_id.clone(),
 707            is_staff: body.is_staff,
 708            time: time.timestamp_millis(),
 709            file_extension: event.file_extension.unwrap_or_default(),
 710            signed_in: wrapper.signed_in,
 711            country_code: country_code.unwrap_or("XX".to_string()),
 712            region_code: "".to_string(),
 713            city: "".to_string(),
 714            suggestion_id: event.suggestion_id.unwrap_or_default(),
 715            suggestion_accepted: event.suggestion_accepted,
 716        }
 717    }
 718}
 719
 720#[derive(Serialize, Debug, clickhouse::Row)]
 721pub struct CallEventRow {
 722    // AppInfoBase
 723    app_version: String,
 724    major: Option<i32>,
 725    minor: Option<i32>,
 726    patch: Option<i32>,
 727    release_channel: String,
 728
 729    // ClientEventBase
 730    installation_id: String,
 731    session_id: Option<String>,
 732    is_staff: Option<bool>,
 733    time: i64,
 734
 735    // CallEventRow
 736    operation: String,
 737    room_id: Option<u64>,
 738    channel_id: Option<u64>,
 739}
 740
 741impl CallEventRow {
 742    fn from_event(
 743        event: CallEvent,
 744        wrapper: &EventWrapper,
 745        body: &EventRequestBody,
 746        first_event_at: chrono::DateTime<chrono::Utc>,
 747    ) -> Self {
 748        let semver = body.semver();
 749        let time =
 750            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 751
 752        Self {
 753            app_version: body.app_version.clone(),
 754            major: semver.map(|v| v.major() as i32),
 755            minor: semver.map(|v| v.minor() as i32),
 756            patch: semver.map(|v| v.patch() as i32),
 757            release_channel: body.release_channel.clone().unwrap_or_default(),
 758            installation_id: body.installation_id.clone().unwrap_or_default(),
 759            session_id: body.session_id.clone(),
 760            is_staff: body.is_staff,
 761            time: time.timestamp_millis(),
 762            operation: event.operation,
 763            room_id: event.room_id,
 764            channel_id: event.channel_id,
 765        }
 766    }
 767}
 768
 769#[derive(Serialize, Debug, clickhouse::Row)]
 770pub struct AssistantEventRow {
 771    // AppInfoBase
 772    app_version: String,
 773    major: Option<i32>,
 774    minor: Option<i32>,
 775    patch: Option<i32>,
 776    release_channel: String,
 777
 778    // ClientEventBase
 779    installation_id: Option<String>,
 780    session_id: Option<String>,
 781    is_staff: Option<bool>,
 782    time: i64,
 783
 784    // AssistantEventRow
 785    conversation_id: String,
 786    kind: String,
 787    model: String,
 788    response_latency_in_ms: Option<i64>,
 789    error_message: Option<String>,
 790}
 791
 792impl AssistantEventRow {
 793    fn from_event(
 794        event: AssistantEvent,
 795        wrapper: &EventWrapper,
 796        body: &EventRequestBody,
 797        first_event_at: chrono::DateTime<chrono::Utc>,
 798    ) -> Self {
 799        let semver = body.semver();
 800        let time =
 801            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 802
 803        Self {
 804            app_version: body.app_version.clone(),
 805            major: semver.map(|v| v.major() as i32),
 806            minor: semver.map(|v| v.minor() as i32),
 807            patch: semver.map(|v| v.patch() as i32),
 808            release_channel: body.release_channel.clone().unwrap_or_default(),
 809            installation_id: body.installation_id.clone(),
 810            session_id: body.session_id.clone(),
 811            is_staff: body.is_staff,
 812            time: time.timestamp_millis(),
 813            conversation_id: event.conversation_id.unwrap_or_default(),
 814            kind: event.kind.to_string(),
 815            model: event.model,
 816            response_latency_in_ms: event
 817                .response_latency
 818                .map(|latency| latency.as_millis() as i64),
 819            error_message: event.error_message,
 820        }
 821    }
 822}
 823
 824#[derive(Debug, clickhouse::Row, Serialize)]
 825pub struct CpuEventRow {
 826    pub installation_id: Option<String>,
 827    pub is_staff: Option<bool>,
 828    pub usage_as_percentage: f32,
 829    pub core_count: u32,
 830    pub app_version: String,
 831    pub release_channel: String,
 832    pub time: i64,
 833    pub session_id: Option<String>,
 834    // pub normalized_cpu_usage: f64, MATERIALIZED
 835    pub major: Option<i32>,
 836    pub minor: Option<i32>,
 837    pub patch: Option<i32>,
 838}
 839
 840impl CpuEventRow {
 841    fn from_event(
 842        event: CpuEvent,
 843        wrapper: &EventWrapper,
 844        body: &EventRequestBody,
 845        first_event_at: chrono::DateTime<chrono::Utc>,
 846    ) -> Self {
 847        let semver = body.semver();
 848        let time =
 849            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 850
 851        Self {
 852            app_version: body.app_version.clone(),
 853            major: semver.map(|v| v.major() as i32),
 854            minor: semver.map(|v| v.minor() as i32),
 855            patch: semver.map(|v| v.patch() as i32),
 856            release_channel: body.release_channel.clone().unwrap_or_default(),
 857            installation_id: body.installation_id.clone(),
 858            session_id: body.session_id.clone(),
 859            is_staff: body.is_staff,
 860            time: time.timestamp_millis(),
 861            usage_as_percentage: event.usage_as_percentage,
 862            core_count: event.core_count,
 863        }
 864    }
 865}
 866
 867#[derive(Serialize, Debug, clickhouse::Row)]
 868pub struct MemoryEventRow {
 869    // AppInfoBase
 870    app_version: String,
 871    major: Option<i32>,
 872    minor: Option<i32>,
 873    patch: Option<i32>,
 874    release_channel: String,
 875
 876    // ClientEventBase
 877    installation_id: Option<String>,
 878    session_id: Option<String>,
 879    is_staff: Option<bool>,
 880    time: i64,
 881
 882    // MemoryEventRow
 883    memory_in_bytes: u64,
 884    virtual_memory_in_bytes: u64,
 885}
 886
 887impl MemoryEventRow {
 888    fn from_event(
 889        event: MemoryEvent,
 890        wrapper: &EventWrapper,
 891        body: &EventRequestBody,
 892        first_event_at: chrono::DateTime<chrono::Utc>,
 893    ) -> Self {
 894        let semver = body.semver();
 895        let time =
 896            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 897
 898        Self {
 899            app_version: body.app_version.clone(),
 900            major: semver.map(|v| v.major() as i32),
 901            minor: semver.map(|v| v.minor() as i32),
 902            patch: semver.map(|v| v.patch() as i32),
 903            release_channel: body.release_channel.clone().unwrap_or_default(),
 904            installation_id: body.installation_id.clone(),
 905            session_id: body.session_id.clone(),
 906            is_staff: body.is_staff,
 907            time: time.timestamp_millis(),
 908            memory_in_bytes: event.memory_in_bytes,
 909            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
 910        }
 911    }
 912}
 913
 914#[derive(Serialize, Debug, clickhouse::Row)]
 915pub struct AppEventRow {
 916    // AppInfoBase
 917    app_version: String,
 918    major: Option<i32>,
 919    minor: Option<i32>,
 920    patch: Option<i32>,
 921    release_channel: String,
 922
 923    // ClientEventBase
 924    installation_id: Option<String>,
 925    session_id: Option<String>,
 926    is_staff: Option<bool>,
 927    time: i64,
 928
 929    // AppEventRow
 930    operation: String,
 931}
 932
 933impl AppEventRow {
 934    fn from_event(
 935        event: AppEvent,
 936        wrapper: &EventWrapper,
 937        body: &EventRequestBody,
 938        first_event_at: chrono::DateTime<chrono::Utc>,
 939    ) -> Self {
 940        let semver = body.semver();
 941        let time =
 942            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 943
 944        Self {
 945            app_version: body.app_version.clone(),
 946            major: semver.map(|v| v.major() as i32),
 947            minor: semver.map(|v| v.minor() as i32),
 948            patch: semver.map(|v| v.patch() as i32),
 949            release_channel: body.release_channel.clone().unwrap_or_default(),
 950            installation_id: body.installation_id.clone(),
 951            session_id: body.session_id.clone(),
 952            is_staff: body.is_staff,
 953            time: time.timestamp_millis(),
 954            operation: event.operation,
 955        }
 956    }
 957}
 958
 959#[derive(Serialize, Debug, clickhouse::Row)]
 960pub struct SettingEventRow {
 961    // AppInfoBase
 962    app_version: String,
 963    major: Option<i32>,
 964    minor: Option<i32>,
 965    patch: Option<i32>,
 966    release_channel: String,
 967
 968    // ClientEventBase
 969    installation_id: Option<String>,
 970    session_id: Option<String>,
 971    is_staff: Option<bool>,
 972    time: i64,
 973    // SettingEventRow
 974    setting: String,
 975    value: String,
 976}
 977
 978impl SettingEventRow {
 979    fn from_event(
 980        event: SettingEvent,
 981        wrapper: &EventWrapper,
 982        body: &EventRequestBody,
 983        first_event_at: chrono::DateTime<chrono::Utc>,
 984    ) -> Self {
 985        let semver = body.semver();
 986        let time =
 987            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 988
 989        Self {
 990            app_version: body.app_version.clone(),
 991            major: semver.map(|v| v.major() as i32),
 992            minor: semver.map(|v| v.minor() as i32),
 993            patch: semver.map(|v| v.patch() as i32),
 994            release_channel: body.release_channel.clone().unwrap_or_default(),
 995            installation_id: body.installation_id.clone(),
 996            session_id: body.session_id.clone(),
 997            is_staff: body.is_staff,
 998            time: time.timestamp_millis(),
 999            setting: event.setting,
1000            value: event.value,
1001        }
1002    }
1003}
1004
1005#[derive(Serialize, Debug, clickhouse::Row)]
1006pub struct ExtensionEventRow {
1007    // AppInfoBase
1008    app_version: String,
1009    major: Option<i32>,
1010    minor: Option<i32>,
1011    patch: Option<i32>,
1012    release_channel: String,
1013
1014    // ClientEventBase
1015    installation_id: Option<String>,
1016    session_id: Option<String>,
1017    is_staff: Option<bool>,
1018    time: i64,
1019
1020    // ExtensionEventRow
1021    extension_id: Arc<str>,
1022    extension_version: Arc<str>,
1023    dev: bool,
1024    schema_version: Option<i32>,
1025    wasm_api_version: Option<String>,
1026}
1027
1028impl ExtensionEventRow {
1029    fn from_event(
1030        event: ExtensionEvent,
1031        wrapper: &EventWrapper,
1032        body: &EventRequestBody,
1033        extension_metadata: Option<ExtensionMetadata>,
1034        first_event_at: chrono::DateTime<chrono::Utc>,
1035    ) -> Self {
1036        let semver = body.semver();
1037        let time =
1038            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1039
1040        Self {
1041            app_version: body.app_version.clone(),
1042            major: semver.map(|v| v.major() as i32),
1043            minor: semver.map(|v| v.minor() as i32),
1044            patch: semver.map(|v| v.patch() as i32),
1045            release_channel: body.release_channel.clone().unwrap_or_default(),
1046            installation_id: body.installation_id.clone(),
1047            session_id: body.session_id.clone(),
1048            is_staff: body.is_staff,
1049            time: time.timestamp_millis(),
1050            extension_id: event.extension_id,
1051            extension_version: event.version,
1052            dev: extension_metadata.is_none(),
1053            schema_version: extension_metadata
1054                .as_ref()
1055                .and_then(|metadata| metadata.manifest.schema_version),
1056            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1057                metadata
1058                    .manifest
1059                    .wasm_api_version
1060                    .as_ref()
1061                    .map(|version| version.to_string())
1062            }),
1063        }
1064    }
1065}
1066
1067#[derive(Serialize, Debug, clickhouse::Row)]
1068pub struct EditEventRow {
1069    // AppInfoBase
1070    app_version: String,
1071    major: Option<i32>,
1072    minor: Option<i32>,
1073    patch: Option<i32>,
1074    release_channel: String,
1075
1076    // ClientEventBase
1077    installation_id: Option<String>,
1078    // Note: This column name has a typo in the ClickHouse table.
1079    #[serde(rename = "sesssion_id")]
1080    session_id: Option<String>,
1081    is_staff: Option<bool>,
1082    time: i64,
1083
1084    // EditEventRow
1085    period_start: i64,
1086    period_end: i64,
1087    environment: String,
1088}
1089
1090impl EditEventRow {
1091    fn from_event(
1092        event: EditEvent,
1093        wrapper: &EventWrapper,
1094        body: &EventRequestBody,
1095        first_event_at: chrono::DateTime<chrono::Utc>,
1096    ) -> Self {
1097        let semver = body.semver();
1098        let time =
1099            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1100
1101        let period_start = time - chrono::Duration::milliseconds(event.duration);
1102        let period_end = time;
1103
1104        Self {
1105            app_version: body.app_version.clone(),
1106            major: semver.map(|v| v.major() as i32),
1107            minor: semver.map(|v| v.minor() as i32),
1108            patch: semver.map(|v| v.patch() as i32),
1109            release_channel: body.release_channel.clone().unwrap_or_default(),
1110            installation_id: body.installation_id.clone(),
1111            session_id: body.session_id.clone(),
1112            is_staff: body.is_staff,
1113            time: time.timestamp_millis(),
1114            period_start: period_start.timestamp_millis(),
1115            period_end: period_end.timestamp_millis(),
1116            environment: event.environment,
1117        }
1118    }
1119}
1120
1121#[derive(Serialize, Debug, clickhouse::Row)]
1122pub struct ActionEventRow {
1123    // AppInfoBase
1124    app_version: String,
1125    major: Option<i32>,
1126    minor: Option<i32>,
1127    patch: Option<i32>,
1128    release_channel: String,
1129
1130    // ClientEventBase
1131    installation_id: Option<String>,
1132    // Note: This column name has a typo in the ClickHouse table.
1133    #[serde(rename = "sesssion_id")]
1134    session_id: Option<String>,
1135    is_staff: Option<bool>,
1136    time: i64,
1137    // ActionEventRow
1138    source: String,
1139    action: String,
1140}
1141
1142impl ActionEventRow {
1143    fn from_event(
1144        event: ActionEvent,
1145        wrapper: &EventWrapper,
1146        body: &EventRequestBody,
1147        first_event_at: chrono::DateTime<chrono::Utc>,
1148    ) -> Self {
1149        let semver = body.semver();
1150        let time =
1151            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1152
1153        Self {
1154            app_version: body.app_version.clone(),
1155            major: semver.map(|v| v.major() as i32),
1156            minor: semver.map(|v| v.minor() as i32),
1157            patch: semver.map(|v| v.patch() as i32),
1158            release_channel: body.release_channel.clone().unwrap_or_default(),
1159            installation_id: body.installation_id.clone(),
1160            session_id: body.session_id.clone(),
1161            is_staff: body.is_staff,
1162            time: time.timestamp_millis(),
1163            source: event.source,
1164            action: event.action,
1165        }
1166    }
1167}
1168
1169pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1170    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
1171        return None;
1172    };
1173
1174    let mut summer = Sha256::new();
1175    summer.update(checksum_seed);
1176    summer.update(&json);
1177    summer.update(checksum_seed);
1178    Some(summer.finalize().into_iter().collect())
1179}