events.rs

   1use super::ips_file::IpsFile;
   2use crate::{api::slack, AppState, Error, Result};
   3use anyhow::{anyhow, Context};
   4use aws_sdk_s3::primitives::ByteStream;
   5use axum::{
   6    body::Bytes,
   7    headers::Header,
   8    http::{HeaderMap, HeaderName, StatusCode},
   9    routing::post,
  10    Extension, Router, TypedHeader,
  11};
  12use rpc::ExtensionMetadata;
  13use semantic_version::SemanticVersion;
  14use serde::{Serialize, Serializer};
  15use sha2::{Digest, Sha256};
  16use std::sync::{Arc, OnceLock};
  17use telemetry_events::{
  18    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
  19    EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent, SettingEvent,
  20};
  21use uuid::Uuid;
  22
  23static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  24
  25pub fn router() -> Router {
  26    Router::new()
  27        .route("/telemetry/events", post(post_events))
  28        .route("/telemetry/crashes", post(post_crash))
  29        .route("/telemetry/hangs", post(post_hang))
  30}
  31
  32pub struct ZedChecksumHeader(Vec<u8>);
  33
  34impl Header for ZedChecksumHeader {
  35    fn name() -> &'static HeaderName {
  36        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  37        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  38    }
  39
  40    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  41    where
  42        Self: Sized,
  43        I: Iterator<Item = &'i axum::http::HeaderValue>,
  44    {
  45        let checksum = values
  46            .next()
  47            .ok_or_else(axum::headers::Error::invalid)?
  48            .to_str()
  49            .map_err(|_| axum::headers::Error::invalid())?;
  50
  51        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  52        Ok(Self(bytes))
  53    }
  54
  55    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  56        unimplemented!()
  57    }
  58}
  59
  60pub struct CloudflareIpCountryHeader(String);
  61
  62impl Header for CloudflareIpCountryHeader {
  63    fn name() -> &'static HeaderName {
  64        static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
  65        CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
  66    }
  67
  68    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  69    where
  70        Self: Sized,
  71        I: Iterator<Item = &'i axum::http::HeaderValue>,
  72    {
  73        let country_code = values
  74            .next()
  75            .ok_or_else(axum::headers::Error::invalid)?
  76            .to_str()
  77            .map_err(|_| axum::headers::Error::invalid())?;
  78
  79        Ok(Self(country_code.to_string()))
  80    }
  81
  82    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  83        unimplemented!()
  84    }
  85}
  86
  87pub async fn post_crash(
  88    Extension(app): Extension<Arc<AppState>>,
  89    headers: HeaderMap,
  90    body: Bytes,
  91) -> Result<()> {
  92    let report = IpsFile::parse(&body)?;
  93    let version_threshold = SemanticVersion::new(0, 123, 0);
  94
  95    let bundle_id = &report.header.bundle_id;
  96    let app_version = &report.app_version();
  97
  98    if bundle_id == "dev.zed.Zed-Dev" {
  99        log::error!("Crash uploads from {} are ignored.", bundle_id);
 100        return Ok(());
 101    }
 102
 103    if app_version.is_none() || app_version.unwrap() < version_threshold {
 104        log::error!(
 105            "Crash uploads from {} are ignored.",
 106            report.header.app_version
 107        );
 108        return Ok(());
 109    }
 110    let app_version = app_version.unwrap();
 111
 112    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 113        let response = blob_store_client
 114            .head_object()
 115            .bucket(CRASH_REPORTS_BUCKET)
 116            .key(report.header.incident_id.clone() + ".ips")
 117            .send()
 118            .await;
 119
 120        if response.is_ok() {
 121            log::info!("We've already uploaded this crash");
 122            return Ok(());
 123        }
 124
 125        blob_store_client
 126            .put_object()
 127            .bucket(CRASH_REPORTS_BUCKET)
 128            .key(report.header.incident_id.clone() + ".ips")
 129            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 130            .body(ByteStream::from(body.to_vec()))
 131            .send()
 132            .await
 133            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 134            .ok();
 135    }
 136
 137    let recent_panic_on: Option<i64> = headers
 138        .get("x-zed-panicked-on")
 139        .and_then(|h| h.to_str().ok())
 140        .and_then(|s| s.parse().ok());
 141
 142    let installation_id = headers
 143        .get("x-zed-installation-id")
 144        .and_then(|h| h.to_str().ok())
 145        .map(|s| s.to_string())
 146        .unwrap_or_default();
 147
 148    let mut recent_panic = None;
 149
 150    if let Some(recent_panic_on) = recent_panic_on {
 151        let crashed_at = match report.timestamp() {
 152            Ok(t) => Some(t),
 153            Err(e) => {
 154                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 155                None
 156            }
 157        };
 158        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 159            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 160        }
 161    }
 162
 163    let description = report.description(recent_panic);
 164    let summary = report.backtrace_summary();
 165
 166    tracing::error!(
 167        service = "client",
 168        version = %report.header.app_version,
 169        os_version = %report.header.os_version,
 170        bundle_id = %report.header.bundle_id,
 171        incident_id = %report.header.incident_id,
 172        installation_id = %installation_id,
 173        description = %description,
 174        backtrace = %summary,
 175        "crash report");
 176
 177    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 178        let payload = slack::WebhookBody::new(|w| {
 179            w.add_section(|s| s.text(slack::Text::markdown(description)))
 180                .add_section(|s| {
 181                    s.add_field(slack::Text::markdown(format!(
 182                        "*Version:*\n{} ({})",
 183                        bundle_id, app_version
 184                    )))
 185                    .add_field({
 186                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 187                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 188                            hostname.strip_prefix("http://").unwrap_or_default()
 189                        });
 190
 191                        slack::Text::markdown(format!(
 192                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 193                            CRASH_REPORTS_BUCKET,
 194                            hostname,
 195                            report.header.incident_id,
 196                            report
 197                                .header
 198                                .incident_id
 199                                .chars()
 200                                .take(8)
 201                                .collect::<String>(),
 202                        ))
 203                    })
 204                })
 205                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 206        });
 207        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 208            log::error!("Failed to serialize payload to JSON: {err}");
 209            Error::Internal(anyhow!(err))
 210        })?;
 211
 212        reqwest::Client::new()
 213            .post(slack_panics_webhook)
 214            .header("Content-Type", "application/json")
 215            .body(payload_json)
 216            .send()
 217            .await
 218            .map_err(|err| {
 219                log::error!("Failed to send payload to Slack: {err}");
 220                Error::Internal(anyhow!(err))
 221            })?;
 222    }
 223
 224    Ok(())
 225}
 226
 227pub async fn post_hang(
 228    Extension(app): Extension<Arc<AppState>>,
 229    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 230    body: Bytes,
 231) -> Result<()> {
 232    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 233        return Err(Error::Http(
 234            StatusCode::INTERNAL_SERVER_ERROR,
 235            "events not enabled".into(),
 236        ))?;
 237    };
 238
 239    if checksum != expected {
 240        return Err(Error::Http(
 241            StatusCode::BAD_REQUEST,
 242            "invalid checksum".into(),
 243        ))?;
 244    }
 245
 246    let incident_id = Uuid::new_v4().to_string();
 247
 248    // dump JSON into S3 so we can get frame offsets if we need to.
 249    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 250        blob_store_client
 251            .put_object()
 252            .bucket(CRASH_REPORTS_BUCKET)
 253            .key(incident_id.clone() + ".hang.json")
 254            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 255            .body(ByteStream::from(body.to_vec()))
 256            .send()
 257            .await
 258            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 259            .ok();
 260    }
 261
 262    let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| {
 263        log::error!("can't parse report json: {err}");
 264        Error::Internal(anyhow!(err))
 265    })?;
 266
 267    let mut backtrace = "Possible hang detected on main thread:".to_string();
 268    let unknown = "<unknown>".to_string();
 269    for frame in report.backtrace.iter() {
 270        backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown)));
 271    }
 272
 273    tracing::error!(
 274        service = "client",
 275        version = %report.app_version.unwrap_or_default().to_string(),
 276        os_name = %report.os_name,
 277        os_version = report.os_version.unwrap_or_default().to_string(),
 278        incident_id = %incident_id,
 279        installation_id = %report.installation_id.unwrap_or_default(),
 280        backtrace = %backtrace,
 281        "hang report");
 282
 283    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 284        let payload = slack::WebhookBody::new(|w| {
 285            w.add_section(|s| s.text(slack::Text::markdown("Possible Hang".to_string())))
 286                .add_section(|s| {
 287                    s.add_field(slack::Text::markdown(format!(
 288                        "*Version:*\n {} ",
 289                        report.app_version.unwrap_or_default()
 290                    )))
 291                    .add_field({
 292                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 293                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 294                            hostname.strip_prefix("http://").unwrap_or_default()
 295                        });
 296
 297                        slack::Text::markdown(format!(
 298                            "*Incident:*\n<https://{}.{}/{}.hang.json|{}…>",
 299                            CRASH_REPORTS_BUCKET,
 300                            hostname,
 301                            incident_id,
 302                            incident_id.chars().take(8).collect::<String>(),
 303                        ))
 304                    })
 305                })
 306                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace)))
 307        });
 308        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 309            log::error!("Failed to serialize payload to JSON: {err}");
 310            Error::Internal(anyhow!(err))
 311        })?;
 312
 313        reqwest::Client::new()
 314            .post(slack_panics_webhook)
 315            .header("Content-Type", "application/json")
 316            .body(payload_json)
 317            .send()
 318            .await
 319            .map_err(|err| {
 320                log::error!("Failed to send payload to Slack: {err}");
 321                Error::Internal(anyhow!(err))
 322            })?;
 323    }
 324
 325    Ok(())
 326}
 327
 328pub async fn post_events(
 329    Extension(app): Extension<Arc<AppState>>,
 330    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 331    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 332    body: Bytes,
 333) -> Result<()> {
 334    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 335        Err(Error::Http(
 336            StatusCode::NOT_IMPLEMENTED,
 337            "not supported".into(),
 338        ))?
 339    };
 340
 341    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 342        return Err(Error::Http(
 343            StatusCode::INTERNAL_SERVER_ERROR,
 344            "events not enabled".into(),
 345        ))?;
 346    };
 347
 348    if checksum != expected {
 349        return Err(Error::Http(
 350            StatusCode::BAD_REQUEST,
 351            "invalid checksum".into(),
 352        ))?;
 353    }
 354
 355    let request_body: telemetry_events::EventRequestBody =
 356        serde_json::from_slice(&body).map_err(|err| {
 357            log::error!("can't parse event json: {err}");
 358            Error::Internal(anyhow!(err))
 359        })?;
 360
 361    let mut to_upload = ToUpload::default();
 362    let Some(last_event) = request_body.events.last() else {
 363        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
 364    };
 365    let country_code = country_code_header.map(|h| h.0 .0);
 366
 367    let first_event_at = chrono::Utc::now()
 368        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 369
 370    for wrapper in &request_body.events {
 371        match &wrapper.event {
 372            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 373                event.clone(),
 374                &wrapper,
 375                &request_body,
 376                first_event_at,
 377                country_code.clone(),
 378            )),
 379            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
 380                event.clone(),
 381                &wrapper,
 382                &request_body,
 383                first_event_at,
 384                country_code.clone(),
 385            )),
 386            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 387                event.clone(),
 388                &wrapper,
 389                &request_body,
 390                first_event_at,
 391            )),
 392            Event::Assistant(event) => {
 393                to_upload
 394                    .assistant_events
 395                    .push(AssistantEventRow::from_event(
 396                        event.clone(),
 397                        &wrapper,
 398                        &request_body,
 399                        first_event_at,
 400                    ))
 401            }
 402            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 403                event.clone(),
 404                &wrapper,
 405                &request_body,
 406                first_event_at,
 407            )),
 408            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 409                event.clone(),
 410                &wrapper,
 411                &request_body,
 412                first_event_at,
 413            )),
 414            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 415                event.clone(),
 416                &wrapper,
 417                &request_body,
 418                first_event_at,
 419            )),
 420            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 421                event.clone(),
 422                &wrapper,
 423                &request_body,
 424                first_event_at,
 425            )),
 426            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 427                event.clone(),
 428                &wrapper,
 429                &request_body,
 430                first_event_at,
 431            )),
 432            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 433                event.clone(),
 434                &wrapper,
 435                &request_body,
 436                first_event_at,
 437            )),
 438            Event::Extension(event) => {
 439                let metadata = app
 440                    .db
 441                    .get_extension_version(&event.extension_id, &event.version)
 442                    .await?;
 443                to_upload
 444                    .extension_events
 445                    .push(ExtensionEventRow::from_event(
 446                        event.clone(),
 447                        &wrapper,
 448                        &request_body,
 449                        metadata,
 450                        first_event_at,
 451                    ))
 452            }
 453        }
 454    }
 455
 456    to_upload
 457        .upload(&clickhouse_client)
 458        .await
 459        .map_err(|err| Error::Internal(anyhow!(err)))?;
 460
 461    Ok(())
 462}
 463
 464#[derive(Default)]
 465struct ToUpload {
 466    editor_events: Vec<EditorEventRow>,
 467    copilot_events: Vec<CopilotEventRow>,
 468    assistant_events: Vec<AssistantEventRow>,
 469    call_events: Vec<CallEventRow>,
 470    cpu_events: Vec<CpuEventRow>,
 471    memory_events: Vec<MemoryEventRow>,
 472    app_events: Vec<AppEventRow>,
 473    setting_events: Vec<SettingEventRow>,
 474    extension_events: Vec<ExtensionEventRow>,
 475    edit_events: Vec<EditEventRow>,
 476    action_events: Vec<ActionEventRow>,
 477}
 478
 479impl ToUpload {
 480    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 481        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 482        Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 483            .await
 484            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 485
 486        const COPILOT_EVENTS_TABLE: &str = "copilot_events";
 487        Self::upload_to_table(
 488            COPILOT_EVENTS_TABLE,
 489            &self.copilot_events,
 490            clickhouse_client,
 491        )
 492        .await
 493        .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
 494
 495        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 496        Self::upload_to_table(
 497            ASSISTANT_EVENTS_TABLE,
 498            &self.assistant_events,
 499            clickhouse_client,
 500        )
 501        .await
 502        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 503
 504        const CALL_EVENTS_TABLE: &str = "call_events";
 505        Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 506            .await
 507            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 508
 509        const CPU_EVENTS_TABLE: &str = "cpu_events";
 510        Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 511            .await
 512            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 513
 514        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 515        Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 516            .await
 517            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 518
 519        const APP_EVENTS_TABLE: &str = "app_events";
 520        Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 521            .await
 522            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 523
 524        const SETTING_EVENTS_TABLE: &str = "setting_events";
 525        Self::upload_to_table(
 526            SETTING_EVENTS_TABLE,
 527            &self.setting_events,
 528            clickhouse_client,
 529        )
 530        .await
 531        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 532
 533        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 534        Self::upload_to_table(
 535            EXTENSION_EVENTS_TABLE,
 536            &self.extension_events,
 537            clickhouse_client,
 538        )
 539        .await
 540        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 541
 542        const EDIT_EVENTS_TABLE: &str = "edit_events";
 543        Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 544            .await
 545            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 546
 547        const ACTION_EVENTS_TABLE: &str = "action_events";
 548        Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 549            .await
 550            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 551
 552        Ok(())
 553    }
 554
 555    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
 556        table: &str,
 557        rows: &[T],
 558        clickhouse_client: &clickhouse::Client,
 559    ) -> anyhow::Result<()> {
 560        if !rows.is_empty() {
 561            let mut insert = clickhouse_client.insert(table)?;
 562
 563            for event in rows {
 564                insert.write(event).await?;
 565            }
 566
 567            insert.end().await?;
 568
 569            let event_count = rows.len();
 570            log::info!(
 571                "wrote {event_count} {event_specifier} to '{table}'",
 572                event_specifier = if event_count == 1 { "event" } else { "events" }
 573            );
 574        }
 575
 576        Ok(())
 577    }
 578}
 579
 580pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 581where
 582    S: Serializer,
 583{
 584    if country_code.len() != 2 {
 585        use serde::ser::Error;
 586        return Err(S::Error::custom(
 587            "country_code must be exactly 2 characters",
 588        ));
 589    }
 590
 591    let country_code = country_code.as_bytes();
 592
 593    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
 594}
 595
 596#[derive(Serialize, Debug, clickhouse::Row)]
 597pub struct EditorEventRow {
 598    pub installation_id: String,
 599    pub operation: String,
 600    pub app_version: String,
 601    pub file_extension: String,
 602    pub os_name: String,
 603    pub os_version: String,
 604    pub release_channel: String,
 605    pub signed_in: bool,
 606    pub vim_mode: bool,
 607    #[serde(serialize_with = "serialize_country_code")]
 608    pub country_code: String,
 609    pub region_code: String,
 610    pub city: String,
 611    pub time: i64,
 612    pub copilot_enabled: bool,
 613    pub copilot_enabled_for_language: bool,
 614    pub historical_event: bool,
 615    pub architecture: String,
 616    pub is_staff: Option<bool>,
 617    pub session_id: Option<String>,
 618    pub major: Option<i32>,
 619    pub minor: Option<i32>,
 620    pub patch: Option<i32>,
 621}
 622
 623impl EditorEventRow {
 624    fn from_event(
 625        event: EditorEvent,
 626        wrapper: &EventWrapper,
 627        body: &EventRequestBody,
 628        first_event_at: chrono::DateTime<chrono::Utc>,
 629        country_code: Option<String>,
 630    ) -> Self {
 631        let semver = body.semver();
 632        let time =
 633            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 634
 635        Self {
 636            app_version: body.app_version.clone(),
 637            major: semver.map(|v| v.major() as i32),
 638            minor: semver.map(|v| v.minor() as i32),
 639            patch: semver.map(|v| v.patch() as i32),
 640            release_channel: body.release_channel.clone().unwrap_or_default(),
 641            os_name: body.os_name.clone(),
 642            os_version: body.os_version.clone().unwrap_or_default(),
 643            architecture: body.architecture.clone(),
 644            installation_id: body.installation_id.clone().unwrap_or_default(),
 645            session_id: body.session_id.clone(),
 646            is_staff: body.is_staff,
 647            time: time.timestamp_millis(),
 648            operation: event.operation,
 649            file_extension: event.file_extension.unwrap_or_default(),
 650            signed_in: wrapper.signed_in,
 651            vim_mode: event.vim_mode,
 652            copilot_enabled: event.copilot_enabled,
 653            copilot_enabled_for_language: event.copilot_enabled_for_language,
 654            country_code: country_code.unwrap_or("XX".to_string()),
 655            region_code: "".to_string(),
 656            city: "".to_string(),
 657            historical_event: false,
 658        }
 659    }
 660}
 661
 662#[derive(Serialize, Debug, clickhouse::Row)]
 663pub struct CopilotEventRow {
 664    pub installation_id: String,
 665    pub suggestion_id: String,
 666    pub suggestion_accepted: bool,
 667    pub app_version: String,
 668    pub file_extension: String,
 669    pub os_name: String,
 670    pub os_version: String,
 671    pub release_channel: String,
 672    pub signed_in: bool,
 673    #[serde(serialize_with = "serialize_country_code")]
 674    pub country_code: String,
 675    pub region_code: String,
 676    pub city: String,
 677    pub time: i64,
 678    pub is_staff: Option<bool>,
 679    pub session_id: Option<String>,
 680    pub major: Option<i32>,
 681    pub minor: Option<i32>,
 682    pub patch: Option<i32>,
 683}
 684
 685impl CopilotEventRow {
 686    fn from_event(
 687        event: CopilotEvent,
 688        wrapper: &EventWrapper,
 689        body: &EventRequestBody,
 690        first_event_at: chrono::DateTime<chrono::Utc>,
 691        country_code: Option<String>,
 692    ) -> Self {
 693        let semver = body.semver();
 694        let time =
 695            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 696
 697        Self {
 698            app_version: body.app_version.clone(),
 699            major: semver.map(|v| v.major() as i32),
 700            minor: semver.map(|v| v.minor() as i32),
 701            patch: semver.map(|v| v.patch() as i32),
 702            release_channel: body.release_channel.clone().unwrap_or_default(),
 703            os_name: body.os_name.clone(),
 704            os_version: body.os_version.clone().unwrap_or_default(),
 705            installation_id: body.installation_id.clone().unwrap_or_default(),
 706            session_id: body.session_id.clone(),
 707            is_staff: body.is_staff,
 708            time: time.timestamp_millis(),
 709            file_extension: event.file_extension.unwrap_or_default(),
 710            signed_in: wrapper.signed_in,
 711            country_code: country_code.unwrap_or("XX".to_string()),
 712            region_code: "".to_string(),
 713            city: "".to_string(),
 714            suggestion_id: event.suggestion_id.unwrap_or_default(),
 715            suggestion_accepted: event.suggestion_accepted,
 716        }
 717    }
 718}
 719
 720#[derive(Serialize, Debug, clickhouse::Row)]
 721pub struct CallEventRow {
 722    // AppInfoBase
 723    app_version: String,
 724    major: Option<i32>,
 725    minor: Option<i32>,
 726    patch: Option<i32>,
 727    release_channel: String,
 728
 729    // ClientEventBase
 730    installation_id: String,
 731    session_id: Option<String>,
 732    is_staff: Option<bool>,
 733    time: i64,
 734
 735    // CallEventRow
 736    operation: String,
 737    room_id: Option<u64>,
 738    channel_id: Option<u64>,
 739}
 740
 741impl CallEventRow {
 742    fn from_event(
 743        event: CallEvent,
 744        wrapper: &EventWrapper,
 745        body: &EventRequestBody,
 746        first_event_at: chrono::DateTime<chrono::Utc>,
 747    ) -> Self {
 748        let semver = body.semver();
 749        let time =
 750            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 751
 752        Self {
 753            app_version: body.app_version.clone(),
 754            major: semver.map(|v| v.major() as i32),
 755            minor: semver.map(|v| v.minor() as i32),
 756            patch: semver.map(|v| v.patch() as i32),
 757            release_channel: body.release_channel.clone().unwrap_or_default(),
 758            installation_id: body.installation_id.clone().unwrap_or_default(),
 759            session_id: body.session_id.clone(),
 760            is_staff: body.is_staff,
 761            time: time.timestamp_millis(),
 762            operation: event.operation,
 763            room_id: event.room_id,
 764            channel_id: event.channel_id,
 765        }
 766    }
 767}
 768
 769#[derive(Serialize, Debug, clickhouse::Row)]
 770pub struct AssistantEventRow {
 771    // AppInfoBase
 772    app_version: String,
 773    major: Option<i32>,
 774    minor: Option<i32>,
 775    patch: Option<i32>,
 776    release_channel: String,
 777
 778    // ClientEventBase
 779    installation_id: Option<String>,
 780    session_id: Option<String>,
 781    is_staff: Option<bool>,
 782    time: i64,
 783
 784    // AssistantEventRow
 785    conversation_id: String,
 786    kind: String,
 787    model: String,
 788}
 789
 790impl AssistantEventRow {
 791    fn from_event(
 792        event: AssistantEvent,
 793        wrapper: &EventWrapper,
 794        body: &EventRequestBody,
 795        first_event_at: chrono::DateTime<chrono::Utc>,
 796    ) -> Self {
 797        let semver = body.semver();
 798        let time =
 799            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 800
 801        Self {
 802            app_version: body.app_version.clone(),
 803            major: semver.map(|v| v.major() as i32),
 804            minor: semver.map(|v| v.minor() as i32),
 805            patch: semver.map(|v| v.patch() as i32),
 806            release_channel: body.release_channel.clone().unwrap_or_default(),
 807            installation_id: body.installation_id.clone(),
 808            session_id: body.session_id.clone(),
 809            is_staff: body.is_staff,
 810            time: time.timestamp_millis(),
 811            conversation_id: event.conversation_id.unwrap_or_default(),
 812            kind: event.kind.to_string(),
 813            model: event.model,
 814        }
 815    }
 816}
 817
 818#[derive(Debug, clickhouse::Row, Serialize)]
 819pub struct CpuEventRow {
 820    pub installation_id: Option<String>,
 821    pub is_staff: Option<bool>,
 822    pub usage_as_percentage: f32,
 823    pub core_count: u32,
 824    pub app_version: String,
 825    pub release_channel: String,
 826    pub time: i64,
 827    pub session_id: Option<String>,
 828    // pub normalized_cpu_usage: f64, MATERIALIZED
 829    pub major: Option<i32>,
 830    pub minor: Option<i32>,
 831    pub patch: Option<i32>,
 832}
 833
 834impl CpuEventRow {
 835    fn from_event(
 836        event: CpuEvent,
 837        wrapper: &EventWrapper,
 838        body: &EventRequestBody,
 839        first_event_at: chrono::DateTime<chrono::Utc>,
 840    ) -> Self {
 841        let semver = body.semver();
 842        let time =
 843            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 844
 845        Self {
 846            app_version: body.app_version.clone(),
 847            major: semver.map(|v| v.major() as i32),
 848            minor: semver.map(|v| v.minor() as i32),
 849            patch: semver.map(|v| v.patch() as i32),
 850            release_channel: body.release_channel.clone().unwrap_or_default(),
 851            installation_id: body.installation_id.clone(),
 852            session_id: body.session_id.clone(),
 853            is_staff: body.is_staff,
 854            time: time.timestamp_millis(),
 855            usage_as_percentage: event.usage_as_percentage,
 856            core_count: event.core_count,
 857        }
 858    }
 859}
 860
 861#[derive(Serialize, Debug, clickhouse::Row)]
 862pub struct MemoryEventRow {
 863    // AppInfoBase
 864    app_version: String,
 865    major: Option<i32>,
 866    minor: Option<i32>,
 867    patch: Option<i32>,
 868    release_channel: String,
 869
 870    // ClientEventBase
 871    installation_id: Option<String>,
 872    session_id: Option<String>,
 873    is_staff: Option<bool>,
 874    time: i64,
 875
 876    // MemoryEventRow
 877    memory_in_bytes: u64,
 878    virtual_memory_in_bytes: u64,
 879}
 880
 881impl MemoryEventRow {
 882    fn from_event(
 883        event: MemoryEvent,
 884        wrapper: &EventWrapper,
 885        body: &EventRequestBody,
 886        first_event_at: chrono::DateTime<chrono::Utc>,
 887    ) -> Self {
 888        let semver = body.semver();
 889        let time =
 890            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 891
 892        Self {
 893            app_version: body.app_version.clone(),
 894            major: semver.map(|v| v.major() as i32),
 895            minor: semver.map(|v| v.minor() as i32),
 896            patch: semver.map(|v| v.patch() as i32),
 897            release_channel: body.release_channel.clone().unwrap_or_default(),
 898            installation_id: body.installation_id.clone(),
 899            session_id: body.session_id.clone(),
 900            is_staff: body.is_staff,
 901            time: time.timestamp_millis(),
 902            memory_in_bytes: event.memory_in_bytes,
 903            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
 904        }
 905    }
 906}
 907
 908#[derive(Serialize, Debug, clickhouse::Row)]
 909pub struct AppEventRow {
 910    // AppInfoBase
 911    app_version: String,
 912    major: Option<i32>,
 913    minor: Option<i32>,
 914    patch: Option<i32>,
 915    release_channel: String,
 916
 917    // ClientEventBase
 918    installation_id: Option<String>,
 919    session_id: Option<String>,
 920    is_staff: Option<bool>,
 921    time: i64,
 922
 923    // AppEventRow
 924    operation: String,
 925}
 926
 927impl AppEventRow {
 928    fn from_event(
 929        event: AppEvent,
 930        wrapper: &EventWrapper,
 931        body: &EventRequestBody,
 932        first_event_at: chrono::DateTime<chrono::Utc>,
 933    ) -> Self {
 934        let semver = body.semver();
 935        let time =
 936            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 937
 938        Self {
 939            app_version: body.app_version.clone(),
 940            major: semver.map(|v| v.major() as i32),
 941            minor: semver.map(|v| v.minor() as i32),
 942            patch: semver.map(|v| v.patch() as i32),
 943            release_channel: body.release_channel.clone().unwrap_or_default(),
 944            installation_id: body.installation_id.clone(),
 945            session_id: body.session_id.clone(),
 946            is_staff: body.is_staff,
 947            time: time.timestamp_millis(),
 948            operation: event.operation,
 949        }
 950    }
 951}
 952
 953#[derive(Serialize, Debug, clickhouse::Row)]
 954pub struct SettingEventRow {
 955    // AppInfoBase
 956    app_version: String,
 957    major: Option<i32>,
 958    minor: Option<i32>,
 959    patch: Option<i32>,
 960    release_channel: String,
 961
 962    // ClientEventBase
 963    installation_id: Option<String>,
 964    session_id: Option<String>,
 965    is_staff: Option<bool>,
 966    time: i64,
 967    // SettingEventRow
 968    setting: String,
 969    value: String,
 970}
 971
 972impl SettingEventRow {
 973    fn from_event(
 974        event: SettingEvent,
 975        wrapper: &EventWrapper,
 976        body: &EventRequestBody,
 977        first_event_at: chrono::DateTime<chrono::Utc>,
 978    ) -> Self {
 979        let semver = body.semver();
 980        let time =
 981            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 982
 983        Self {
 984            app_version: body.app_version.clone(),
 985            major: semver.map(|v| v.major() as i32),
 986            minor: semver.map(|v| v.minor() as i32),
 987            patch: semver.map(|v| v.patch() as i32),
 988            release_channel: body.release_channel.clone().unwrap_or_default(),
 989            installation_id: body.installation_id.clone(),
 990            session_id: body.session_id.clone(),
 991            is_staff: body.is_staff,
 992            time: time.timestamp_millis(),
 993            setting: event.setting,
 994            value: event.value,
 995        }
 996    }
 997}
 998
 999#[derive(Serialize, Debug, clickhouse::Row)]
1000pub struct ExtensionEventRow {
1001    // AppInfoBase
1002    app_version: String,
1003    major: Option<i32>,
1004    minor: Option<i32>,
1005    patch: Option<i32>,
1006    release_channel: String,
1007
1008    // ClientEventBase
1009    installation_id: Option<String>,
1010    session_id: Option<String>,
1011    is_staff: Option<bool>,
1012    time: i64,
1013
1014    // ExtensionEventRow
1015    extension_id: Arc<str>,
1016    extension_version: Arc<str>,
1017    dev: bool,
1018    schema_version: Option<i32>,
1019    wasm_api_version: Option<String>,
1020}
1021
1022impl ExtensionEventRow {
1023    fn from_event(
1024        event: ExtensionEvent,
1025        wrapper: &EventWrapper,
1026        body: &EventRequestBody,
1027        extension_metadata: Option<ExtensionMetadata>,
1028        first_event_at: chrono::DateTime<chrono::Utc>,
1029    ) -> Self {
1030        let semver = body.semver();
1031        let time =
1032            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1033
1034        Self {
1035            app_version: body.app_version.clone(),
1036            major: semver.map(|v| v.major() as i32),
1037            minor: semver.map(|v| v.minor() as i32),
1038            patch: semver.map(|v| v.patch() as i32),
1039            release_channel: body.release_channel.clone().unwrap_or_default(),
1040            installation_id: body.installation_id.clone(),
1041            session_id: body.session_id.clone(),
1042            is_staff: body.is_staff,
1043            time: time.timestamp_millis(),
1044            extension_id: event.extension_id,
1045            extension_version: event.version,
1046            dev: extension_metadata.is_none(),
1047            schema_version: extension_metadata
1048                .as_ref()
1049                .and_then(|metadata| metadata.manifest.schema_version),
1050            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
1051                metadata
1052                    .manifest
1053                    .wasm_api_version
1054                    .as_ref()
1055                    .map(|version| version.to_string())
1056            }),
1057        }
1058    }
1059}
1060
1061#[derive(Serialize, Debug, clickhouse::Row)]
1062pub struct EditEventRow {
1063    // AppInfoBase
1064    app_version: String,
1065    major: Option<i32>,
1066    minor: Option<i32>,
1067    patch: Option<i32>,
1068    release_channel: String,
1069
1070    // ClientEventBase
1071    installation_id: Option<String>,
1072    // Note: This column name has a typo in the ClickHouse table.
1073    #[serde(rename = "sesssion_id")]
1074    session_id: Option<String>,
1075    is_staff: Option<bool>,
1076    time: i64,
1077
1078    // EditEventRow
1079    period_start: i64,
1080    period_end: i64,
1081    environment: String,
1082}
1083
1084impl EditEventRow {
1085    fn from_event(
1086        event: EditEvent,
1087        wrapper: &EventWrapper,
1088        body: &EventRequestBody,
1089        first_event_at: chrono::DateTime<chrono::Utc>,
1090    ) -> Self {
1091        let semver = body.semver();
1092        let time =
1093            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1094
1095        let period_start = time - chrono::Duration::milliseconds(event.duration);
1096        let period_end = time;
1097
1098        Self {
1099            app_version: body.app_version.clone(),
1100            major: semver.map(|v| v.major() as i32),
1101            minor: semver.map(|v| v.minor() as i32),
1102            patch: semver.map(|v| v.patch() as i32),
1103            release_channel: body.release_channel.clone().unwrap_or_default(),
1104            installation_id: body.installation_id.clone(),
1105            session_id: body.session_id.clone(),
1106            is_staff: body.is_staff,
1107            time: time.timestamp_millis(),
1108            period_start: period_start.timestamp_millis(),
1109            period_end: period_end.timestamp_millis(),
1110            environment: event.environment,
1111        }
1112    }
1113}
1114
1115#[derive(Serialize, Debug, clickhouse::Row)]
1116pub struct ActionEventRow {
1117    // AppInfoBase
1118    app_version: String,
1119    major: Option<i32>,
1120    minor: Option<i32>,
1121    patch: Option<i32>,
1122    release_channel: String,
1123
1124    // ClientEventBase
1125    installation_id: Option<String>,
1126    // Note: This column name has a typo in the ClickHouse table.
1127    #[serde(rename = "sesssion_id")]
1128    session_id: Option<String>,
1129    is_staff: Option<bool>,
1130    time: i64,
1131    // ActionEventRow
1132    source: String,
1133    action: String,
1134}
1135
1136impl ActionEventRow {
1137    fn from_event(
1138        event: ActionEvent,
1139        wrapper: &EventWrapper,
1140        body: &EventRequestBody,
1141        first_event_at: chrono::DateTime<chrono::Utc>,
1142    ) -> Self {
1143        let semver = body.semver();
1144        let time =
1145            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1146
1147        Self {
1148            app_version: body.app_version.clone(),
1149            major: semver.map(|v| v.major() as i32),
1150            minor: semver.map(|v| v.minor() as i32),
1151            patch: semver.map(|v| v.patch() as i32),
1152            release_channel: body.release_channel.clone().unwrap_or_default(),
1153            installation_id: body.installation_id.clone(),
1154            session_id: body.session_id.clone(),
1155            is_staff: body.is_staff,
1156            time: time.timestamp_millis(),
1157            source: event.source,
1158            action: event.action,
1159        }
1160    }
1161}
1162
1163pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
1164    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
1165        return None;
1166    };
1167
1168    let mut summer = Sha256::new();
1169    summer.update(checksum_seed);
1170    summer.update(&json);
1171    summer.update(checksum_seed);
1172    Some(summer.finalize().into_iter().collect())
1173}