events.rs

   1use super::ips_file::IpsFile;
   2use crate::{api::slack, AppState, Error, Result};
   3use anyhow::{anyhow, Context};
   4use aws_sdk_s3::primitives::ByteStream;
   5use axum::{
   6    body::Bytes,
   7    headers::Header,
   8    http::{HeaderMap, HeaderName, StatusCode},
   9    routing::post,
  10    Extension, Router, TypedHeader,
  11};
  12use rpc::ExtensionMetadata;
  13use serde::{Serialize, Serializer};
  14use sha2::{Digest, Sha256};
  15use std::sync::{Arc, OnceLock};
  16use telemetry_events::{
  17    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
  18    EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent, SettingEvent,
  19};
  20use util::SemanticVersion;
  21
  22pub fn router() -> Router {
  23    Router::new()
  24        .route("/telemetry/events", post(post_events))
  25        .route("/telemetry/crashes", post(post_crash))
  26}
  27
  28pub struct ZedChecksumHeader(Vec<u8>);
  29
  30impl Header for ZedChecksumHeader {
  31    fn name() -> &'static HeaderName {
  32        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  33        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  34    }
  35
  36    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  37    where
  38        Self: Sized,
  39        I: Iterator<Item = &'i axum::http::HeaderValue>,
  40    {
  41        let checksum = values
  42            .next()
  43            .ok_or_else(axum::headers::Error::invalid)?
  44            .to_str()
  45            .map_err(|_| axum::headers::Error::invalid())?;
  46
  47        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  48        Ok(Self(bytes))
  49    }
  50
  51    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  52        unimplemented!()
  53    }
  54}
  55
  56pub struct CloudflareIpCountryHeader(String);
  57
  58impl Header for CloudflareIpCountryHeader {
  59    fn name() -> &'static HeaderName {
  60        static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
  61        CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
  62    }
  63
  64    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  65    where
  66        Self: Sized,
  67        I: Iterator<Item = &'i axum::http::HeaderValue>,
  68    {
  69        let country_code = values
  70            .next()
  71            .ok_or_else(axum::headers::Error::invalid)?
  72            .to_str()
  73            .map_err(|_| axum::headers::Error::invalid())?;
  74
  75        Ok(Self(country_code.to_string()))
  76    }
  77
  78    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  79        unimplemented!()
  80    }
  81}
  82
  83pub async fn post_crash(
  84    Extension(app): Extension<Arc<AppState>>,
  85    headers: HeaderMap,
  86    body: Bytes,
  87) -> Result<()> {
  88    static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  89
  90    let report = IpsFile::parse(&body)?;
  91    let version_threshold = SemanticVersion::new(0, 123, 0);
  92
  93    let bundle_id = &report.header.bundle_id;
  94    let app_version = &report.app_version();
  95
  96    if bundle_id == "dev.zed.Zed-Dev" {
  97        log::error!("Crash uploads from {} are ignored.", bundle_id);
  98        return Ok(());
  99    }
 100
 101    if app_version.is_none() || app_version.unwrap() < version_threshold {
 102        log::error!(
 103            "Crash uploads from {} are ignored.",
 104            report.header.app_version
 105        );
 106        return Ok(());
 107    }
 108    let app_version = app_version.unwrap();
 109
 110    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 111        let response = blob_store_client
 112            .head_object()
 113            .bucket(CRASH_REPORTS_BUCKET)
 114            .key(report.header.incident_id.clone() + ".ips")
 115            .send()
 116            .await;
 117
 118        if response.is_ok() {
 119            log::info!("We've already uploaded this crash");
 120            return Ok(());
 121        }
 122
 123        blob_store_client
 124            .put_object()
 125            .bucket(CRASH_REPORTS_BUCKET)
 126            .key(report.header.incident_id.clone() + ".ips")
 127            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 128            .body(ByteStream::from(body.to_vec()))
 129            .send()
 130            .await
 131            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 132            .ok();
 133    }
 134
 135    let recent_panic_on: Option<i64> = headers
 136        .get("x-zed-panicked-on")
 137        .and_then(|h| h.to_str().ok())
 138        .and_then(|s| s.parse().ok());
 139    let mut recent_panic = None;
 140
 141    if let Some(recent_panic_on) = recent_panic_on {
 142        let crashed_at = match report.timestamp() {
 143            Ok(t) => Some(t),
 144            Err(e) => {
 145                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 146                None
 147            }
 148        };
 149        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 150            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 151        }
 152    }
 153
 154    let description = report.description(recent_panic);
 155    let summary = report.backtrace_summary();
 156
 157    tracing::error!(
 158        service = "client",
 159        version = %report.header.app_version,
 160        os_version = %report.header.os_version,
 161        bundle_id = %report.header.bundle_id,
 162        incident_id = %report.header.incident_id,
 163        description = %description,
 164        backtrace = %summary,
 165        "crash report");
 166
 167    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 168        let payload = slack::WebhookBody::new(|w| {
 169            w.add_section(|s| s.text(slack::Text::markdown(description)))
 170                .add_section(|s| {
 171                    s.add_field(slack::Text::markdown(format!(
 172                        "*Version:*\n{} ({})",
 173                        bundle_id, app_version
 174                    )))
 175                    .add_field({
 176                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 177                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 178                            hostname.strip_prefix("http://").unwrap_or_default()
 179                        });
 180
 181                        slack::Text::markdown(format!(
 182                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 183                            CRASH_REPORTS_BUCKET,
 184                            hostname,
 185                            report.header.incident_id,
 186                            report
 187                                .header
 188                                .incident_id
 189                                .chars()
 190                                .take(8)
 191                                .collect::<String>(),
 192                        ))
 193                    })
 194                })
 195                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 196        });
 197        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 198            log::error!("Failed to serialize payload to JSON: {err}");
 199            Error::Internal(anyhow!(err))
 200        })?;
 201
 202        reqwest::Client::new()
 203            .post(slack_panics_webhook)
 204            .header("Content-Type", "application/json")
 205            .body(payload_json)
 206            .send()
 207            .await
 208            .map_err(|err| {
 209                log::error!("Failed to send payload to Slack: {err}");
 210                Error::Internal(anyhow!(err))
 211            })?;
 212    }
 213
 214    Ok(())
 215}
 216
 217pub async fn post_events(
 218    Extension(app): Extension<Arc<AppState>>,
 219    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 220    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 221    body: Bytes,
 222) -> Result<()> {
 223    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 224        Err(Error::Http(
 225            StatusCode::NOT_IMPLEMENTED,
 226            "not supported".into(),
 227        ))?
 228    };
 229
 230    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
 231        return Err(Error::Http(
 232            StatusCode::INTERNAL_SERVER_ERROR,
 233            "events not enabled".into(),
 234        ))?;
 235    };
 236
 237    let mut summer = Sha256::new();
 238    summer.update(checksum_seed);
 239    summer.update(&body);
 240    summer.update(checksum_seed);
 241
 242    if &checksum != &summer.finalize()[..] {
 243        return Err(Error::Http(
 244            StatusCode::BAD_REQUEST,
 245            "invalid checksum".into(),
 246        ))?;
 247    }
 248
 249    let request_body: telemetry_events::EventRequestBody =
 250        serde_json::from_slice(&body).map_err(|err| {
 251            log::error!("can't parse event json: {err}");
 252            Error::Internal(anyhow!(err))
 253        })?;
 254
 255    let mut to_upload = ToUpload::default();
 256    let Some(last_event) = request_body.events.last() else {
 257        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
 258    };
 259    let country_code = country_code_header.map(|h| h.0 .0);
 260
 261    let first_event_at = chrono::Utc::now()
 262        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 263
 264    for wrapper in &request_body.events {
 265        match &wrapper.event {
 266            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 267                event.clone(),
 268                &wrapper,
 269                &request_body,
 270                first_event_at,
 271                country_code.clone(),
 272            )),
 273            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
 274                event.clone(),
 275                &wrapper,
 276                &request_body,
 277                first_event_at,
 278                country_code.clone(),
 279            )),
 280            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 281                event.clone(),
 282                &wrapper,
 283                &request_body,
 284                first_event_at,
 285            )),
 286            Event::Assistant(event) => {
 287                to_upload
 288                    .assistant_events
 289                    .push(AssistantEventRow::from_event(
 290                        event.clone(),
 291                        &wrapper,
 292                        &request_body,
 293                        first_event_at,
 294                    ))
 295            }
 296            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 297                event.clone(),
 298                &wrapper,
 299                &request_body,
 300                first_event_at,
 301            )),
 302            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 303                event.clone(),
 304                &wrapper,
 305                &request_body,
 306                first_event_at,
 307            )),
 308            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 309                event.clone(),
 310                &wrapper,
 311                &request_body,
 312                first_event_at,
 313            )),
 314            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 315                event.clone(),
 316                &wrapper,
 317                &request_body,
 318                first_event_at,
 319            )),
 320            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 321                event.clone(),
 322                &wrapper,
 323                &request_body,
 324                first_event_at,
 325            )),
 326            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 327                event.clone(),
 328                &wrapper,
 329                &request_body,
 330                first_event_at,
 331            )),
 332            Event::Extension(event) => {
 333                let metadata = app
 334                    .db
 335                    .get_extension_version(&event.extension_id, &event.version)
 336                    .await?;
 337                to_upload
 338                    .extension_events
 339                    .push(ExtensionEventRow::from_event(
 340                        event.clone(),
 341                        &wrapper,
 342                        &request_body,
 343                        metadata,
 344                        first_event_at,
 345                    ))
 346            }
 347        }
 348    }
 349
 350    to_upload
 351        .upload(&clickhouse_client)
 352        .await
 353        .map_err(|err| Error::Internal(anyhow!(err)))?;
 354
 355    Ok(())
 356}
 357
 358#[derive(Default)]
 359struct ToUpload {
 360    editor_events: Vec<EditorEventRow>,
 361    copilot_events: Vec<CopilotEventRow>,
 362    assistant_events: Vec<AssistantEventRow>,
 363    call_events: Vec<CallEventRow>,
 364    cpu_events: Vec<CpuEventRow>,
 365    memory_events: Vec<MemoryEventRow>,
 366    app_events: Vec<AppEventRow>,
 367    setting_events: Vec<SettingEventRow>,
 368    extension_events: Vec<ExtensionEventRow>,
 369    edit_events: Vec<EditEventRow>,
 370    action_events: Vec<ActionEventRow>,
 371}
 372
 373impl ToUpload {
 374    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 375        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 376        Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 377            .await
 378            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 379
 380        const COPILOT_EVENTS_TABLE: &str = "copilot_events";
 381        Self::upload_to_table(
 382            COPILOT_EVENTS_TABLE,
 383            &self.copilot_events,
 384            clickhouse_client,
 385        )
 386        .await
 387        .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
 388
 389        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 390        Self::upload_to_table(
 391            ASSISTANT_EVENTS_TABLE,
 392            &self.assistant_events,
 393            clickhouse_client,
 394        )
 395        .await
 396        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 397
 398        const CALL_EVENTS_TABLE: &str = "call_events";
 399        Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 400            .await
 401            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 402
 403        const CPU_EVENTS_TABLE: &str = "cpu_events";
 404        Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 405            .await
 406            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 407
 408        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 409        Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 410            .await
 411            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 412
 413        const APP_EVENTS_TABLE: &str = "app_events";
 414        Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 415            .await
 416            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 417
 418        const SETTING_EVENTS_TABLE: &str = "setting_events";
 419        Self::upload_to_table(
 420            SETTING_EVENTS_TABLE,
 421            &self.setting_events,
 422            clickhouse_client,
 423        )
 424        .await
 425        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 426
 427        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 428        Self::upload_to_table(
 429            EXTENSION_EVENTS_TABLE,
 430            &self.extension_events,
 431            clickhouse_client,
 432        )
 433        .await
 434        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 435
 436        const EDIT_EVENTS_TABLE: &str = "edit_events";
 437        Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 438            .await
 439            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 440
 441        const ACTION_EVENTS_TABLE: &str = "action_events";
 442        Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 443            .await
 444            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 445
 446        Ok(())
 447    }
 448
 449    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
 450        table: &str,
 451        rows: &[T],
 452        clickhouse_client: &clickhouse::Client,
 453    ) -> anyhow::Result<()> {
 454        if !rows.is_empty() {
 455            let mut insert = clickhouse_client.insert(table)?;
 456
 457            for event in rows {
 458                insert.write(event).await?;
 459            }
 460
 461            insert.end().await?;
 462        }
 463
 464        Ok(())
 465    }
 466}
 467
 468pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 469where
 470    S: Serializer,
 471{
 472    if country_code.len() != 2 {
 473        use serde::ser::Error;
 474        return Err(S::Error::custom(
 475            "country_code must be exactly 2 characters",
 476        ));
 477    }
 478
 479    let country_code = country_code.as_bytes();
 480
 481    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
 482}
 483
 484#[derive(Serialize, Debug, clickhouse::Row)]
 485pub struct EditorEventRow {
 486    pub installation_id: String,
 487    pub operation: String,
 488    pub app_version: String,
 489    pub file_extension: String,
 490    pub os_name: String,
 491    pub os_version: String,
 492    pub release_channel: String,
 493    pub signed_in: bool,
 494    pub vim_mode: bool,
 495    #[serde(serialize_with = "serialize_country_code")]
 496    pub country_code: String,
 497    pub region_code: String,
 498    pub city: String,
 499    pub time: i64,
 500    pub copilot_enabled: bool,
 501    pub copilot_enabled_for_language: bool,
 502    pub historical_event: bool,
 503    pub architecture: String,
 504    pub is_staff: Option<bool>,
 505    pub session_id: Option<String>,
 506    pub major: Option<i32>,
 507    pub minor: Option<i32>,
 508    pub patch: Option<i32>,
 509}
 510
 511impl EditorEventRow {
 512    fn from_event(
 513        event: EditorEvent,
 514        wrapper: &EventWrapper,
 515        body: &EventRequestBody,
 516        first_event_at: chrono::DateTime<chrono::Utc>,
 517        country_code: Option<String>,
 518    ) -> Self {
 519        let semver = body.semver();
 520        let time =
 521            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 522
 523        Self {
 524            app_version: body.app_version.clone(),
 525            major: semver.map(|s| s.major as i32),
 526            minor: semver.map(|s| s.minor as i32),
 527            patch: semver.map(|s| s.patch as i32),
 528            release_channel: body.release_channel.clone().unwrap_or_default(),
 529            os_name: body.os_name.clone(),
 530            os_version: body.os_version.clone().unwrap_or_default(),
 531            architecture: body.architecture.clone(),
 532            installation_id: body.installation_id.clone().unwrap_or_default(),
 533            session_id: body.session_id.clone(),
 534            is_staff: body.is_staff,
 535            time: time.timestamp_millis(),
 536            operation: event.operation,
 537            file_extension: event.file_extension.unwrap_or_default(),
 538            signed_in: wrapper.signed_in,
 539            vim_mode: event.vim_mode,
 540            copilot_enabled: event.copilot_enabled,
 541            copilot_enabled_for_language: event.copilot_enabled_for_language,
 542            country_code: country_code.unwrap_or("XX".to_string()),
 543            region_code: "".to_string(),
 544            city: "".to_string(),
 545            historical_event: false,
 546        }
 547    }
 548}
 549
 550#[derive(Serialize, Debug, clickhouse::Row)]
 551pub struct CopilotEventRow {
 552    pub installation_id: String,
 553    pub suggestion_id: String,
 554    pub suggestion_accepted: bool,
 555    pub app_version: String,
 556    pub file_extension: String,
 557    pub os_name: String,
 558    pub os_version: String,
 559    pub release_channel: String,
 560    pub signed_in: bool,
 561    #[serde(serialize_with = "serialize_country_code")]
 562    pub country_code: String,
 563    pub region_code: String,
 564    pub city: String,
 565    pub time: i64,
 566    pub is_staff: Option<bool>,
 567    pub session_id: Option<String>,
 568    pub major: Option<i32>,
 569    pub minor: Option<i32>,
 570    pub patch: Option<i32>,
 571}
 572
 573impl CopilotEventRow {
 574    fn from_event(
 575        event: CopilotEvent,
 576        wrapper: &EventWrapper,
 577        body: &EventRequestBody,
 578        first_event_at: chrono::DateTime<chrono::Utc>,
 579        country_code: Option<String>,
 580    ) -> Self {
 581        let semver = body.semver();
 582        let time =
 583            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 584
 585        Self {
 586            app_version: body.app_version.clone(),
 587            major: semver.map(|s| s.major as i32),
 588            minor: semver.map(|s| s.minor as i32),
 589            patch: semver.map(|s| s.patch as i32),
 590            release_channel: body.release_channel.clone().unwrap_or_default(),
 591            os_name: body.os_name.clone(),
 592            os_version: body.os_version.clone().unwrap_or_default(),
 593            installation_id: body.installation_id.clone().unwrap_or_default(),
 594            session_id: body.session_id.clone(),
 595            is_staff: body.is_staff,
 596            time: time.timestamp_millis(),
 597            file_extension: event.file_extension.unwrap_or_default(),
 598            signed_in: wrapper.signed_in,
 599            country_code: country_code.unwrap_or("XX".to_string()),
 600            region_code: "".to_string(),
 601            city: "".to_string(),
 602            suggestion_id: event.suggestion_id.unwrap_or_default(),
 603            suggestion_accepted: event.suggestion_accepted,
 604        }
 605    }
 606}
 607
 608#[derive(Serialize, Debug, clickhouse::Row)]
 609pub struct CallEventRow {
 610    // AppInfoBase
 611    app_version: String,
 612    major: Option<i32>,
 613    minor: Option<i32>,
 614    patch: Option<i32>,
 615    release_channel: String,
 616
 617    // ClientEventBase
 618    installation_id: String,
 619    session_id: Option<String>,
 620    is_staff: Option<bool>,
 621    time: i64,
 622
 623    // CallEventRow
 624    operation: String,
 625    room_id: Option<u64>,
 626    channel_id: Option<u64>,
 627}
 628
 629impl CallEventRow {
 630    fn from_event(
 631        event: CallEvent,
 632        wrapper: &EventWrapper,
 633        body: &EventRequestBody,
 634        first_event_at: chrono::DateTime<chrono::Utc>,
 635    ) -> Self {
 636        let semver = body.semver();
 637        let time =
 638            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 639
 640        Self {
 641            app_version: body.app_version.clone(),
 642            major: semver.map(|s| s.major as i32),
 643            minor: semver.map(|s| s.minor as i32),
 644            patch: semver.map(|s| s.patch as i32),
 645            release_channel: body.release_channel.clone().unwrap_or_default(),
 646            installation_id: body.installation_id.clone().unwrap_or_default(),
 647            session_id: body.session_id.clone(),
 648            is_staff: body.is_staff,
 649            time: time.timestamp_millis(),
 650            operation: event.operation,
 651            room_id: event.room_id,
 652            channel_id: event.channel_id,
 653        }
 654    }
 655}
 656
 657#[derive(Serialize, Debug, clickhouse::Row)]
 658pub struct AssistantEventRow {
 659    // AppInfoBase
 660    app_version: String,
 661    major: Option<i32>,
 662    minor: Option<i32>,
 663    patch: Option<i32>,
 664    release_channel: String,
 665
 666    // ClientEventBase
 667    installation_id: Option<String>,
 668    session_id: Option<String>,
 669    is_staff: Option<bool>,
 670    time: i64,
 671
 672    // AssistantEventRow
 673    conversation_id: String,
 674    kind: String,
 675    model: String,
 676}
 677
 678impl AssistantEventRow {
 679    fn from_event(
 680        event: AssistantEvent,
 681        wrapper: &EventWrapper,
 682        body: &EventRequestBody,
 683        first_event_at: chrono::DateTime<chrono::Utc>,
 684    ) -> Self {
 685        let semver = body.semver();
 686        let time =
 687            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 688
 689        Self {
 690            app_version: body.app_version.clone(),
 691            major: semver.map(|s| s.major as i32),
 692            minor: semver.map(|s| s.minor as i32),
 693            patch: semver.map(|s| s.patch as i32),
 694            release_channel: body.release_channel.clone().unwrap_or_default(),
 695            installation_id: body.installation_id.clone(),
 696            session_id: body.session_id.clone(),
 697            is_staff: body.is_staff,
 698            time: time.timestamp_millis(),
 699            conversation_id: event.conversation_id.unwrap_or_default(),
 700            kind: event.kind.to_string(),
 701            model: event.model,
 702        }
 703    }
 704}
 705
 706#[derive(Debug, clickhouse::Row, Serialize)]
 707pub struct CpuEventRow {
 708    pub installation_id: Option<String>,
 709    pub is_staff: Option<bool>,
 710    pub usage_as_percentage: f32,
 711    pub core_count: u32,
 712    pub app_version: String,
 713    pub release_channel: String,
 714    pub time: i64,
 715    pub session_id: Option<String>,
 716    // pub normalized_cpu_usage: f64, MATERIALIZED
 717    pub major: Option<i32>,
 718    pub minor: Option<i32>,
 719    pub patch: Option<i32>,
 720}
 721
 722impl CpuEventRow {
 723    fn from_event(
 724        event: CpuEvent,
 725        wrapper: &EventWrapper,
 726        body: &EventRequestBody,
 727        first_event_at: chrono::DateTime<chrono::Utc>,
 728    ) -> Self {
 729        let semver = body.semver();
 730        let time =
 731            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 732
 733        Self {
 734            app_version: body.app_version.clone(),
 735            major: semver.map(|s| s.major as i32),
 736            minor: semver.map(|s| s.minor as i32),
 737            patch: semver.map(|s| s.patch as i32),
 738            release_channel: body.release_channel.clone().unwrap_or_default(),
 739            installation_id: body.installation_id.clone(),
 740            session_id: body.session_id.clone(),
 741            is_staff: body.is_staff,
 742            time: time.timestamp_millis(),
 743            usage_as_percentage: event.usage_as_percentage,
 744            core_count: event.core_count,
 745        }
 746    }
 747}
 748
 749#[derive(Serialize, Debug, clickhouse::Row)]
 750pub struct MemoryEventRow {
 751    // AppInfoBase
 752    app_version: String,
 753    major: Option<i32>,
 754    minor: Option<i32>,
 755    patch: Option<i32>,
 756    release_channel: String,
 757
 758    // ClientEventBase
 759    installation_id: Option<String>,
 760    session_id: Option<String>,
 761    is_staff: Option<bool>,
 762    time: i64,
 763
 764    // MemoryEventRow
 765    memory_in_bytes: u64,
 766    virtual_memory_in_bytes: u64,
 767}
 768
 769impl MemoryEventRow {
 770    fn from_event(
 771        event: MemoryEvent,
 772        wrapper: &EventWrapper,
 773        body: &EventRequestBody,
 774        first_event_at: chrono::DateTime<chrono::Utc>,
 775    ) -> Self {
 776        let semver = body.semver();
 777        let time =
 778            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 779
 780        Self {
 781            app_version: body.app_version.clone(),
 782            major: semver.map(|s| s.major as i32),
 783            minor: semver.map(|s| s.minor as i32),
 784            patch: semver.map(|s| s.patch as i32),
 785            release_channel: body.release_channel.clone().unwrap_or_default(),
 786            installation_id: body.installation_id.clone(),
 787            session_id: body.session_id.clone(),
 788            is_staff: body.is_staff,
 789            time: time.timestamp_millis(),
 790            memory_in_bytes: event.memory_in_bytes,
 791            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
 792        }
 793    }
 794}
 795
 796#[derive(Serialize, Debug, clickhouse::Row)]
 797pub struct AppEventRow {
 798    // AppInfoBase
 799    app_version: String,
 800    major: Option<i32>,
 801    minor: Option<i32>,
 802    patch: Option<i32>,
 803    release_channel: String,
 804
 805    // ClientEventBase
 806    installation_id: Option<String>,
 807    session_id: Option<String>,
 808    is_staff: Option<bool>,
 809    time: i64,
 810
 811    // AppEventRow
 812    operation: String,
 813}
 814
 815impl AppEventRow {
 816    fn from_event(
 817        event: AppEvent,
 818        wrapper: &EventWrapper,
 819        body: &EventRequestBody,
 820        first_event_at: chrono::DateTime<chrono::Utc>,
 821    ) -> Self {
 822        let semver = body.semver();
 823        let time =
 824            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 825
 826        Self {
 827            app_version: body.app_version.clone(),
 828            major: semver.map(|s| s.major as i32),
 829            minor: semver.map(|s| s.minor as i32),
 830            patch: semver.map(|s| s.patch as i32),
 831            release_channel: body.release_channel.clone().unwrap_or_default(),
 832            installation_id: body.installation_id.clone(),
 833            session_id: body.session_id.clone(),
 834            is_staff: body.is_staff,
 835            time: time.timestamp_millis(),
 836            operation: event.operation,
 837        }
 838    }
 839}
 840
 841#[derive(Serialize, Debug, clickhouse::Row)]
 842pub struct SettingEventRow {
 843    // AppInfoBase
 844    app_version: String,
 845    major: Option<i32>,
 846    minor: Option<i32>,
 847    patch: Option<i32>,
 848    release_channel: String,
 849
 850    // ClientEventBase
 851    installation_id: Option<String>,
 852    session_id: Option<String>,
 853    is_staff: Option<bool>,
 854    time: i64,
 855    // SettingEventRow
 856    setting: String,
 857    value: String,
 858}
 859
 860impl SettingEventRow {
 861    fn from_event(
 862        event: SettingEvent,
 863        wrapper: &EventWrapper,
 864        body: &EventRequestBody,
 865        first_event_at: chrono::DateTime<chrono::Utc>,
 866    ) -> Self {
 867        let semver = body.semver();
 868        let time =
 869            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 870
 871        Self {
 872            app_version: body.app_version.clone(),
 873            major: semver.map(|s| s.major as i32),
 874            minor: semver.map(|s| s.minor as i32),
 875            patch: semver.map(|s| s.patch as i32),
 876            release_channel: body.release_channel.clone().unwrap_or_default(),
 877            installation_id: body.installation_id.clone(),
 878            session_id: body.session_id.clone(),
 879            is_staff: body.is_staff,
 880            time: time.timestamp_millis(),
 881            setting: event.setting,
 882            value: event.value,
 883        }
 884    }
 885}
 886
 887#[derive(Serialize, Debug, clickhouse::Row)]
 888pub struct ExtensionEventRow {
 889    // AppInfoBase
 890    app_version: String,
 891    major: Option<i32>,
 892    minor: Option<i32>,
 893    patch: Option<i32>,
 894    release_channel: String,
 895
 896    // ClientEventBase
 897    installation_id: Option<String>,
 898    session_id: Option<String>,
 899    is_staff: Option<bool>,
 900    time: i64,
 901
 902    // ExtensionEventRow
 903    extension_id: Arc<str>,
 904    extension_version: Arc<str>,
 905    dev: bool,
 906    schema_version: Option<i32>,
 907    wasm_api_version: Option<String>,
 908}
 909
 910impl ExtensionEventRow {
 911    fn from_event(
 912        event: ExtensionEvent,
 913        wrapper: &EventWrapper,
 914        body: &EventRequestBody,
 915        extension_metadata: Option<ExtensionMetadata>,
 916        first_event_at: chrono::DateTime<chrono::Utc>,
 917    ) -> Self {
 918        let semver = body.semver();
 919        let time =
 920            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 921
 922        Self {
 923            app_version: body.app_version.clone(),
 924            major: semver.map(|s| s.major as i32),
 925            minor: semver.map(|s| s.minor as i32),
 926            patch: semver.map(|s| s.patch as i32),
 927            release_channel: body.release_channel.clone().unwrap_or_default(),
 928            installation_id: body.installation_id.clone(),
 929            session_id: body.session_id.clone(),
 930            is_staff: body.is_staff,
 931            time: time.timestamp_millis(),
 932            extension_id: event.extension_id,
 933            extension_version: event.version,
 934            dev: extension_metadata.is_none(),
 935            schema_version: extension_metadata
 936                .as_ref()
 937                .and_then(|metadata| metadata.manifest.schema_version),
 938            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
 939                metadata
 940                    .manifest
 941                    .wasm_api_version
 942                    .as_ref()
 943                    .map(|version| version.to_string())
 944            }),
 945        }
 946    }
 947}
 948
 949#[derive(Serialize, Debug, clickhouse::Row)]
 950pub struct EditEventRow {
 951    // AppInfoBase
 952    app_version: String,
 953    major: Option<i32>,
 954    minor: Option<i32>,
 955    patch: Option<i32>,
 956    release_channel: String,
 957
 958    // ClientEventBase
 959    installation_id: Option<String>,
 960    // Note: This column name has a typo in the ClickHouse table.
 961    #[serde(rename = "sesssion_id")]
 962    session_id: Option<String>,
 963    is_staff: Option<bool>,
 964    time: i64,
 965
 966    // EditEventRow
 967    period_start: i64,
 968    period_end: i64,
 969    environment: String,
 970}
 971
 972impl EditEventRow {
 973    fn from_event(
 974        event: EditEvent,
 975        wrapper: &EventWrapper,
 976        body: &EventRequestBody,
 977        first_event_at: chrono::DateTime<chrono::Utc>,
 978    ) -> Self {
 979        let semver = body.semver();
 980        let time =
 981            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 982
 983        let period_start = time - chrono::Duration::milliseconds(event.duration);
 984        let period_end = time;
 985
 986        Self {
 987            app_version: body.app_version.clone(),
 988            major: semver.map(|s| s.major as i32),
 989            minor: semver.map(|s| s.minor as i32),
 990            patch: semver.map(|s| s.patch as i32),
 991            release_channel: body.release_channel.clone().unwrap_or_default(),
 992            installation_id: body.installation_id.clone(),
 993            session_id: body.session_id.clone(),
 994            is_staff: body.is_staff,
 995            time: time.timestamp_millis(),
 996            period_start: period_start.timestamp_millis(),
 997            period_end: period_end.timestamp_millis(),
 998            environment: event.environment,
 999        }
1000    }
1001}
1002
1003#[derive(Serialize, Debug, clickhouse::Row)]
1004pub struct ActionEventRow {
1005    // AppInfoBase
1006    app_version: String,
1007    major: Option<i32>,
1008    minor: Option<i32>,
1009    patch: Option<i32>,
1010    release_channel: String,
1011
1012    // ClientEventBase
1013    installation_id: Option<String>,
1014    // Note: This column name has a typo in the ClickHouse table.
1015    #[serde(rename = "sesssion_id")]
1016    session_id: Option<String>,
1017    is_staff: Option<bool>,
1018    time: i64,
1019    // ActionEventRow
1020    source: String,
1021    action: String,
1022}
1023
1024impl ActionEventRow {
1025    fn from_event(
1026        event: ActionEvent,
1027        wrapper: &EventWrapper,
1028        body: &EventRequestBody,
1029        first_event_at: chrono::DateTime<chrono::Utc>,
1030    ) -> Self {
1031        let semver = body.semver();
1032        let time =
1033            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1034
1035        Self {
1036            app_version: body.app_version.clone(),
1037            major: semver.map(|s| s.major as i32),
1038            minor: semver.map(|s| s.minor as i32),
1039            patch: semver.map(|s| s.patch as i32),
1040            release_channel: body.release_channel.clone().unwrap_or_default(),
1041            installation_id: body.installation_id.clone(),
1042            session_id: body.session_id.clone(),
1043            is_staff: body.is_staff,
1044            time: time.timestamp_millis(),
1045            source: event.source,
1046            action: event.action,
1047        }
1048    }
1049}