events.rs

   1use super::ips_file::IpsFile;
   2use crate::{api::slack, AppState, Error, Result};
   3use anyhow::{anyhow, Context};
   4use aws_sdk_s3::primitives::ByteStream;
   5use axum::{
   6    body::Bytes,
   7    headers::Header,
   8    http::{HeaderMap, HeaderName, StatusCode},
   9    routing::post,
  10    Extension, Router, TypedHeader,
  11};
  12use rpc::ExtensionMetadata;
  13use semantic_version::SemanticVersion;
  14use serde::{Serialize, Serializer};
  15use sha2::{Digest, Sha256};
  16use std::sync::{Arc, OnceLock};
  17use telemetry_events::{
  18    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
  19    EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent, SettingEvent,
  20};
  21
  22pub fn router() -> Router {
  23    Router::new()
  24        .route("/telemetry/events", post(post_events))
  25        .route("/telemetry/crashes", post(post_crash))
  26}
  27
  28pub struct ZedChecksumHeader(Vec<u8>);
  29
  30impl Header for ZedChecksumHeader {
  31    fn name() -> &'static HeaderName {
  32        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  33        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  34    }
  35
  36    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  37    where
  38        Self: Sized,
  39        I: Iterator<Item = &'i axum::http::HeaderValue>,
  40    {
  41        let checksum = values
  42            .next()
  43            .ok_or_else(axum::headers::Error::invalid)?
  44            .to_str()
  45            .map_err(|_| axum::headers::Error::invalid())?;
  46
  47        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  48        Ok(Self(bytes))
  49    }
  50
  51    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  52        unimplemented!()
  53    }
  54}
  55
  56pub struct CloudflareIpCountryHeader(String);
  57
  58impl Header for CloudflareIpCountryHeader {
  59    fn name() -> &'static HeaderName {
  60        static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
  61        CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
  62    }
  63
  64    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  65    where
  66        Self: Sized,
  67        I: Iterator<Item = &'i axum::http::HeaderValue>,
  68    {
  69        let country_code = values
  70            .next()
  71            .ok_or_else(axum::headers::Error::invalid)?
  72            .to_str()
  73            .map_err(|_| axum::headers::Error::invalid())?;
  74
  75        Ok(Self(country_code.to_string()))
  76    }
  77
  78    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  79        unimplemented!()
  80    }
  81}
  82
  83pub async fn post_crash(
  84    Extension(app): Extension<Arc<AppState>>,
  85    headers: HeaderMap,
  86    body: Bytes,
  87) -> Result<()> {
  88    static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  89
  90    let report = IpsFile::parse(&body)?;
  91    let version_threshold = SemanticVersion::new(0, 123, 0);
  92
  93    let bundle_id = &report.header.bundle_id;
  94    let app_version = &report.app_version();
  95
  96    if bundle_id == "dev.zed.Zed-Dev" {
  97        log::error!("Crash uploads from {} are ignored.", bundle_id);
  98        return Ok(());
  99    }
 100
 101    if app_version.is_none() || app_version.unwrap() < version_threshold {
 102        log::error!(
 103            "Crash uploads from {} are ignored.",
 104            report.header.app_version
 105        );
 106        return Ok(());
 107    }
 108    let app_version = app_version.unwrap();
 109
 110    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 111        let response = blob_store_client
 112            .head_object()
 113            .bucket(CRASH_REPORTS_BUCKET)
 114            .key(report.header.incident_id.clone() + ".ips")
 115            .send()
 116            .await;
 117
 118        if response.is_ok() {
 119            log::info!("We've already uploaded this crash");
 120            return Ok(());
 121        }
 122
 123        blob_store_client
 124            .put_object()
 125            .bucket(CRASH_REPORTS_BUCKET)
 126            .key(report.header.incident_id.clone() + ".ips")
 127            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 128            .body(ByteStream::from(body.to_vec()))
 129            .send()
 130            .await
 131            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 132            .ok();
 133    }
 134
 135    let recent_panic_on: Option<i64> = headers
 136        .get("x-zed-panicked-on")
 137        .and_then(|h| h.to_str().ok())
 138        .and_then(|s| s.parse().ok());
 139    let mut recent_panic = None;
 140
 141    if let Some(recent_panic_on) = recent_panic_on {
 142        let crashed_at = match report.timestamp() {
 143            Ok(t) => Some(t),
 144            Err(e) => {
 145                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 146                None
 147            }
 148        };
 149        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 150            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 151        }
 152    }
 153
 154    let description = report.description(recent_panic);
 155    let summary = report.backtrace_summary();
 156
 157    tracing::error!(
 158        service = "client",
 159        version = %report.header.app_version,
 160        os_version = %report.header.os_version,
 161        bundle_id = %report.header.bundle_id,
 162        incident_id = %report.header.incident_id,
 163        description = %description,
 164        backtrace = %summary,
 165        "crash report");
 166
 167    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 168        let payload = slack::WebhookBody::new(|w| {
 169            w.add_section(|s| s.text(slack::Text::markdown(description)))
 170                .add_section(|s| {
 171                    s.add_field(slack::Text::markdown(format!(
 172                        "*Version:*\n{} ({})",
 173                        bundle_id, app_version
 174                    )))
 175                    .add_field({
 176                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 177                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 178                            hostname.strip_prefix("http://").unwrap_or_default()
 179                        });
 180
 181                        slack::Text::markdown(format!(
 182                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 183                            CRASH_REPORTS_BUCKET,
 184                            hostname,
 185                            report.header.incident_id,
 186                            report
 187                                .header
 188                                .incident_id
 189                                .chars()
 190                                .take(8)
 191                                .collect::<String>(),
 192                        ))
 193                    })
 194                })
 195                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 196        });
 197        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 198            log::error!("Failed to serialize payload to JSON: {err}");
 199            Error::Internal(anyhow!(err))
 200        })?;
 201
 202        reqwest::Client::new()
 203            .post(slack_panics_webhook)
 204            .header("Content-Type", "application/json")
 205            .body(payload_json)
 206            .send()
 207            .await
 208            .map_err(|err| {
 209                log::error!("Failed to send payload to Slack: {err}");
 210                Error::Internal(anyhow!(err))
 211            })?;
 212    }
 213
 214    Ok(())
 215}
 216
 217pub async fn post_events(
 218    Extension(app): Extension<Arc<AppState>>,
 219    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 220    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 221    body: Bytes,
 222) -> Result<()> {
 223    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 224        Err(Error::Http(
 225            StatusCode::NOT_IMPLEMENTED,
 226            "not supported".into(),
 227        ))?
 228    };
 229
 230    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
 231        return Err(Error::Http(
 232            StatusCode::INTERNAL_SERVER_ERROR,
 233            "events not enabled".into(),
 234        ))?;
 235    };
 236
 237    let mut summer = Sha256::new();
 238    summer.update(checksum_seed);
 239    summer.update(&body);
 240    summer.update(checksum_seed);
 241
 242    if &checksum != &summer.finalize()[..] {
 243        return Err(Error::Http(
 244            StatusCode::BAD_REQUEST,
 245            "invalid checksum".into(),
 246        ))?;
 247    }
 248
 249    let request_body: telemetry_events::EventRequestBody =
 250        serde_json::from_slice(&body).map_err(|err| {
 251            log::error!("can't parse event json: {err}");
 252            Error::Internal(anyhow!(err))
 253        })?;
 254
 255    let mut to_upload = ToUpload::default();
 256    let Some(last_event) = request_body.events.last() else {
 257        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
 258    };
 259    let country_code = country_code_header.map(|h| h.0 .0);
 260
 261    let first_event_at = chrono::Utc::now()
 262        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 263
 264    for wrapper in &request_body.events {
 265        match &wrapper.event {
 266            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 267                event.clone(),
 268                &wrapper,
 269                &request_body,
 270                first_event_at,
 271                country_code.clone(),
 272            )),
 273            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
 274                event.clone(),
 275                &wrapper,
 276                &request_body,
 277                first_event_at,
 278                country_code.clone(),
 279            )),
 280            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 281                event.clone(),
 282                &wrapper,
 283                &request_body,
 284                first_event_at,
 285            )),
 286            Event::Assistant(event) => {
 287                to_upload
 288                    .assistant_events
 289                    .push(AssistantEventRow::from_event(
 290                        event.clone(),
 291                        &wrapper,
 292                        &request_body,
 293                        first_event_at,
 294                    ))
 295            }
 296            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 297                event.clone(),
 298                &wrapper,
 299                &request_body,
 300                first_event_at,
 301            )),
 302            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 303                event.clone(),
 304                &wrapper,
 305                &request_body,
 306                first_event_at,
 307            )),
 308            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 309                event.clone(),
 310                &wrapper,
 311                &request_body,
 312                first_event_at,
 313            )),
 314            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 315                event.clone(),
 316                &wrapper,
 317                &request_body,
 318                first_event_at,
 319            )),
 320            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 321                event.clone(),
 322                &wrapper,
 323                &request_body,
 324                first_event_at,
 325            )),
 326            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 327                event.clone(),
 328                &wrapper,
 329                &request_body,
 330                first_event_at,
 331            )),
 332            Event::Extension(event) => {
 333                let metadata = app
 334                    .db
 335                    .get_extension_version(&event.extension_id, &event.version)
 336                    .await?;
 337                to_upload
 338                    .extension_events
 339                    .push(ExtensionEventRow::from_event(
 340                        event.clone(),
 341                        &wrapper,
 342                        &request_body,
 343                        metadata,
 344                        first_event_at,
 345                    ))
 346            }
 347        }
 348    }
 349
 350    to_upload
 351        .upload(&clickhouse_client)
 352        .await
 353        .map_err(|err| Error::Internal(anyhow!(err)))?;
 354
 355    Ok(())
 356}
 357
 358#[derive(Default)]
 359struct ToUpload {
 360    editor_events: Vec<EditorEventRow>,
 361    copilot_events: Vec<CopilotEventRow>,
 362    assistant_events: Vec<AssistantEventRow>,
 363    call_events: Vec<CallEventRow>,
 364    cpu_events: Vec<CpuEventRow>,
 365    memory_events: Vec<MemoryEventRow>,
 366    app_events: Vec<AppEventRow>,
 367    setting_events: Vec<SettingEventRow>,
 368    extension_events: Vec<ExtensionEventRow>,
 369    edit_events: Vec<EditEventRow>,
 370    action_events: Vec<ActionEventRow>,
 371}
 372
 373impl ToUpload {
 374    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 375        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 376        Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 377            .await
 378            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 379
 380        const COPILOT_EVENTS_TABLE: &str = "copilot_events";
 381        Self::upload_to_table(
 382            COPILOT_EVENTS_TABLE,
 383            &self.copilot_events,
 384            clickhouse_client,
 385        )
 386        .await
 387        .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
 388
 389        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 390        Self::upload_to_table(
 391            ASSISTANT_EVENTS_TABLE,
 392            &self.assistant_events,
 393            clickhouse_client,
 394        )
 395        .await
 396        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 397
 398        const CALL_EVENTS_TABLE: &str = "call_events";
 399        Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 400            .await
 401            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 402
 403        const CPU_EVENTS_TABLE: &str = "cpu_events";
 404        Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 405            .await
 406            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 407
 408        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 409        Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 410            .await
 411            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 412
 413        const APP_EVENTS_TABLE: &str = "app_events";
 414        Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 415            .await
 416            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 417
 418        const SETTING_EVENTS_TABLE: &str = "setting_events";
 419        Self::upload_to_table(
 420            SETTING_EVENTS_TABLE,
 421            &self.setting_events,
 422            clickhouse_client,
 423        )
 424        .await
 425        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 426
 427        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 428        Self::upload_to_table(
 429            EXTENSION_EVENTS_TABLE,
 430            &self.extension_events,
 431            clickhouse_client,
 432        )
 433        .await
 434        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 435
 436        const EDIT_EVENTS_TABLE: &str = "edit_events";
 437        Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 438            .await
 439            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 440
 441        const ACTION_EVENTS_TABLE: &str = "action_events";
 442        Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 443            .await
 444            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 445
 446        Ok(())
 447    }
 448
 449    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
 450        table: &str,
 451        rows: &[T],
 452        clickhouse_client: &clickhouse::Client,
 453    ) -> anyhow::Result<()> {
 454        if !rows.is_empty() {
 455            let mut insert = clickhouse_client.insert(table)?;
 456
 457            for event in rows {
 458                insert.write(event).await?;
 459            }
 460
 461            insert.end().await?;
 462
 463            let event_count = rows.len();
 464            log::info!(
 465                "wrote {event_count} {event_specifier} to '{table}'",
 466                event_specifier = if event_count == 1 { "event" } else { "events" }
 467            );
 468        }
 469
 470        Ok(())
 471    }
 472}
 473
 474pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 475where
 476    S: Serializer,
 477{
 478    if country_code.len() != 2 {
 479        use serde::ser::Error;
 480        return Err(S::Error::custom(
 481            "country_code must be exactly 2 characters",
 482        ));
 483    }
 484
 485    let country_code = country_code.as_bytes();
 486
 487    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
 488}
 489
 490#[derive(Serialize, Debug, clickhouse::Row)]
 491pub struct EditorEventRow {
 492    pub installation_id: String,
 493    pub operation: String,
 494    pub app_version: String,
 495    pub file_extension: String,
 496    pub os_name: String,
 497    pub os_version: String,
 498    pub release_channel: String,
 499    pub signed_in: bool,
 500    pub vim_mode: bool,
 501    #[serde(serialize_with = "serialize_country_code")]
 502    pub country_code: String,
 503    pub region_code: String,
 504    pub city: String,
 505    pub time: i64,
 506    pub copilot_enabled: bool,
 507    pub copilot_enabled_for_language: bool,
 508    pub historical_event: bool,
 509    pub architecture: String,
 510    pub is_staff: Option<bool>,
 511    pub session_id: Option<String>,
 512    pub major: Option<i32>,
 513    pub minor: Option<i32>,
 514    pub patch: Option<i32>,
 515}
 516
 517impl EditorEventRow {
 518    fn from_event(
 519        event: EditorEvent,
 520        wrapper: &EventWrapper,
 521        body: &EventRequestBody,
 522        first_event_at: chrono::DateTime<chrono::Utc>,
 523        country_code: Option<String>,
 524    ) -> Self {
 525        let semver = body.semver();
 526        let time =
 527            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 528
 529        Self {
 530            app_version: body.app_version.clone(),
 531            major: semver.map(|v| v.major() as i32),
 532            minor: semver.map(|v| v.minor() as i32),
 533            patch: semver.map(|v| v.patch() as i32),
 534            release_channel: body.release_channel.clone().unwrap_or_default(),
 535            os_name: body.os_name.clone(),
 536            os_version: body.os_version.clone().unwrap_or_default(),
 537            architecture: body.architecture.clone(),
 538            installation_id: body.installation_id.clone().unwrap_or_default(),
 539            session_id: body.session_id.clone(),
 540            is_staff: body.is_staff,
 541            time: time.timestamp_millis(),
 542            operation: event.operation,
 543            file_extension: event.file_extension.unwrap_or_default(),
 544            signed_in: wrapper.signed_in,
 545            vim_mode: event.vim_mode,
 546            copilot_enabled: event.copilot_enabled,
 547            copilot_enabled_for_language: event.copilot_enabled_for_language,
 548            country_code: country_code.unwrap_or("XX".to_string()),
 549            region_code: "".to_string(),
 550            city: "".to_string(),
 551            historical_event: false,
 552        }
 553    }
 554}
 555
 556#[derive(Serialize, Debug, clickhouse::Row)]
 557pub struct CopilotEventRow {
 558    pub installation_id: String,
 559    pub suggestion_id: String,
 560    pub suggestion_accepted: bool,
 561    pub app_version: String,
 562    pub file_extension: String,
 563    pub os_name: String,
 564    pub os_version: String,
 565    pub release_channel: String,
 566    pub signed_in: bool,
 567    #[serde(serialize_with = "serialize_country_code")]
 568    pub country_code: String,
 569    pub region_code: String,
 570    pub city: String,
 571    pub time: i64,
 572    pub is_staff: Option<bool>,
 573    pub session_id: Option<String>,
 574    pub major: Option<i32>,
 575    pub minor: Option<i32>,
 576    pub patch: Option<i32>,
 577}
 578
 579impl CopilotEventRow {
 580    fn from_event(
 581        event: CopilotEvent,
 582        wrapper: &EventWrapper,
 583        body: &EventRequestBody,
 584        first_event_at: chrono::DateTime<chrono::Utc>,
 585        country_code: Option<String>,
 586    ) -> Self {
 587        let semver = body.semver();
 588        let time =
 589            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 590
 591        Self {
 592            app_version: body.app_version.clone(),
 593            major: semver.map(|v| v.major() as i32),
 594            minor: semver.map(|v| v.minor() as i32),
 595            patch: semver.map(|v| v.patch() as i32),
 596            release_channel: body.release_channel.clone().unwrap_or_default(),
 597            os_name: body.os_name.clone(),
 598            os_version: body.os_version.clone().unwrap_or_default(),
 599            installation_id: body.installation_id.clone().unwrap_or_default(),
 600            session_id: body.session_id.clone(),
 601            is_staff: body.is_staff,
 602            time: time.timestamp_millis(),
 603            file_extension: event.file_extension.unwrap_or_default(),
 604            signed_in: wrapper.signed_in,
 605            country_code: country_code.unwrap_or("XX".to_string()),
 606            region_code: "".to_string(),
 607            city: "".to_string(),
 608            suggestion_id: event.suggestion_id.unwrap_or_default(),
 609            suggestion_accepted: event.suggestion_accepted,
 610        }
 611    }
 612}
 613
 614#[derive(Serialize, Debug, clickhouse::Row)]
 615pub struct CallEventRow {
 616    // AppInfoBase
 617    app_version: String,
 618    major: Option<i32>,
 619    minor: Option<i32>,
 620    patch: Option<i32>,
 621    release_channel: String,
 622
 623    // ClientEventBase
 624    installation_id: String,
 625    session_id: Option<String>,
 626    is_staff: Option<bool>,
 627    time: i64,
 628
 629    // CallEventRow
 630    operation: String,
 631    room_id: Option<u64>,
 632    channel_id: Option<u64>,
 633}
 634
 635impl CallEventRow {
 636    fn from_event(
 637        event: CallEvent,
 638        wrapper: &EventWrapper,
 639        body: &EventRequestBody,
 640        first_event_at: chrono::DateTime<chrono::Utc>,
 641    ) -> Self {
 642        let semver = body.semver();
 643        let time =
 644            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 645
 646        Self {
 647            app_version: body.app_version.clone(),
 648            major: semver.map(|v| v.major() as i32),
 649            minor: semver.map(|v| v.minor() as i32),
 650            patch: semver.map(|v| v.patch() as i32),
 651            release_channel: body.release_channel.clone().unwrap_or_default(),
 652            installation_id: body.installation_id.clone().unwrap_or_default(),
 653            session_id: body.session_id.clone(),
 654            is_staff: body.is_staff,
 655            time: time.timestamp_millis(),
 656            operation: event.operation,
 657            room_id: event.room_id,
 658            channel_id: event.channel_id,
 659        }
 660    }
 661}
 662
 663#[derive(Serialize, Debug, clickhouse::Row)]
 664pub struct AssistantEventRow {
 665    // AppInfoBase
 666    app_version: String,
 667    major: Option<i32>,
 668    minor: Option<i32>,
 669    patch: Option<i32>,
 670    release_channel: String,
 671
 672    // ClientEventBase
 673    installation_id: Option<String>,
 674    session_id: Option<String>,
 675    is_staff: Option<bool>,
 676    time: i64,
 677
 678    // AssistantEventRow
 679    conversation_id: String,
 680    kind: String,
 681    model: String,
 682}
 683
 684impl AssistantEventRow {
 685    fn from_event(
 686        event: AssistantEvent,
 687        wrapper: &EventWrapper,
 688        body: &EventRequestBody,
 689        first_event_at: chrono::DateTime<chrono::Utc>,
 690    ) -> Self {
 691        let semver = body.semver();
 692        let time =
 693            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 694
 695        Self {
 696            app_version: body.app_version.clone(),
 697            major: semver.map(|v| v.major() as i32),
 698            minor: semver.map(|v| v.minor() as i32),
 699            patch: semver.map(|v| v.patch() as i32),
 700            release_channel: body.release_channel.clone().unwrap_or_default(),
 701            installation_id: body.installation_id.clone(),
 702            session_id: body.session_id.clone(),
 703            is_staff: body.is_staff,
 704            time: time.timestamp_millis(),
 705            conversation_id: event.conversation_id.unwrap_or_default(),
 706            kind: event.kind.to_string(),
 707            model: event.model,
 708        }
 709    }
 710}
 711
 712#[derive(Debug, clickhouse::Row, Serialize)]
 713pub struct CpuEventRow {
 714    pub installation_id: Option<String>,
 715    pub is_staff: Option<bool>,
 716    pub usage_as_percentage: f32,
 717    pub core_count: u32,
 718    pub app_version: String,
 719    pub release_channel: String,
 720    pub time: i64,
 721    pub session_id: Option<String>,
 722    // pub normalized_cpu_usage: f64, MATERIALIZED
 723    pub major: Option<i32>,
 724    pub minor: Option<i32>,
 725    pub patch: Option<i32>,
 726}
 727
 728impl CpuEventRow {
 729    fn from_event(
 730        event: CpuEvent,
 731        wrapper: &EventWrapper,
 732        body: &EventRequestBody,
 733        first_event_at: chrono::DateTime<chrono::Utc>,
 734    ) -> Self {
 735        let semver = body.semver();
 736        let time =
 737            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 738
 739        Self {
 740            app_version: body.app_version.clone(),
 741            major: semver.map(|v| v.major() as i32),
 742            minor: semver.map(|v| v.minor() as i32),
 743            patch: semver.map(|v| v.patch() as i32),
 744            release_channel: body.release_channel.clone().unwrap_or_default(),
 745            installation_id: body.installation_id.clone(),
 746            session_id: body.session_id.clone(),
 747            is_staff: body.is_staff,
 748            time: time.timestamp_millis(),
 749            usage_as_percentage: event.usage_as_percentage,
 750            core_count: event.core_count,
 751        }
 752    }
 753}
 754
 755#[derive(Serialize, Debug, clickhouse::Row)]
 756pub struct MemoryEventRow {
 757    // AppInfoBase
 758    app_version: String,
 759    major: Option<i32>,
 760    minor: Option<i32>,
 761    patch: Option<i32>,
 762    release_channel: String,
 763
 764    // ClientEventBase
 765    installation_id: Option<String>,
 766    session_id: Option<String>,
 767    is_staff: Option<bool>,
 768    time: i64,
 769
 770    // MemoryEventRow
 771    memory_in_bytes: u64,
 772    virtual_memory_in_bytes: u64,
 773}
 774
 775impl MemoryEventRow {
 776    fn from_event(
 777        event: MemoryEvent,
 778        wrapper: &EventWrapper,
 779        body: &EventRequestBody,
 780        first_event_at: chrono::DateTime<chrono::Utc>,
 781    ) -> Self {
 782        let semver = body.semver();
 783        let time =
 784            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 785
 786        Self {
 787            app_version: body.app_version.clone(),
 788            major: semver.map(|v| v.major() as i32),
 789            minor: semver.map(|v| v.minor() as i32),
 790            patch: semver.map(|v| v.patch() as i32),
 791            release_channel: body.release_channel.clone().unwrap_or_default(),
 792            installation_id: body.installation_id.clone(),
 793            session_id: body.session_id.clone(),
 794            is_staff: body.is_staff,
 795            time: time.timestamp_millis(),
 796            memory_in_bytes: event.memory_in_bytes,
 797            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
 798        }
 799    }
 800}
 801
 802#[derive(Serialize, Debug, clickhouse::Row)]
 803pub struct AppEventRow {
 804    // AppInfoBase
 805    app_version: String,
 806    major: Option<i32>,
 807    minor: Option<i32>,
 808    patch: Option<i32>,
 809    release_channel: String,
 810
 811    // ClientEventBase
 812    installation_id: Option<String>,
 813    session_id: Option<String>,
 814    is_staff: Option<bool>,
 815    time: i64,
 816
 817    // AppEventRow
 818    operation: String,
 819}
 820
 821impl AppEventRow {
 822    fn from_event(
 823        event: AppEvent,
 824        wrapper: &EventWrapper,
 825        body: &EventRequestBody,
 826        first_event_at: chrono::DateTime<chrono::Utc>,
 827    ) -> Self {
 828        let semver = body.semver();
 829        let time =
 830            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 831
 832        Self {
 833            app_version: body.app_version.clone(),
 834            major: semver.map(|v| v.major() as i32),
 835            minor: semver.map(|v| v.minor() as i32),
 836            patch: semver.map(|v| v.patch() as i32),
 837            release_channel: body.release_channel.clone().unwrap_or_default(),
 838            installation_id: body.installation_id.clone(),
 839            session_id: body.session_id.clone(),
 840            is_staff: body.is_staff,
 841            time: time.timestamp_millis(),
 842            operation: event.operation,
 843        }
 844    }
 845}
 846
 847#[derive(Serialize, Debug, clickhouse::Row)]
 848pub struct SettingEventRow {
 849    // AppInfoBase
 850    app_version: String,
 851    major: Option<i32>,
 852    minor: Option<i32>,
 853    patch: Option<i32>,
 854    release_channel: String,
 855
 856    // ClientEventBase
 857    installation_id: Option<String>,
 858    session_id: Option<String>,
 859    is_staff: Option<bool>,
 860    time: i64,
 861    // SettingEventRow
 862    setting: String,
 863    value: String,
 864}
 865
 866impl SettingEventRow {
 867    fn from_event(
 868        event: SettingEvent,
 869        wrapper: &EventWrapper,
 870        body: &EventRequestBody,
 871        first_event_at: chrono::DateTime<chrono::Utc>,
 872    ) -> Self {
 873        let semver = body.semver();
 874        let time =
 875            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 876
 877        Self {
 878            app_version: body.app_version.clone(),
 879            major: semver.map(|v| v.major() as i32),
 880            minor: semver.map(|v| v.minor() as i32),
 881            patch: semver.map(|v| v.patch() as i32),
 882            release_channel: body.release_channel.clone().unwrap_or_default(),
 883            installation_id: body.installation_id.clone(),
 884            session_id: body.session_id.clone(),
 885            is_staff: body.is_staff,
 886            time: time.timestamp_millis(),
 887            setting: event.setting,
 888            value: event.value,
 889        }
 890    }
 891}
 892
 893#[derive(Serialize, Debug, clickhouse::Row)]
 894pub struct ExtensionEventRow {
 895    // AppInfoBase
 896    app_version: String,
 897    major: Option<i32>,
 898    minor: Option<i32>,
 899    patch: Option<i32>,
 900    release_channel: String,
 901
 902    // ClientEventBase
 903    installation_id: Option<String>,
 904    session_id: Option<String>,
 905    is_staff: Option<bool>,
 906    time: i64,
 907
 908    // ExtensionEventRow
 909    extension_id: Arc<str>,
 910    extension_version: Arc<str>,
 911    dev: bool,
 912    schema_version: Option<i32>,
 913    wasm_api_version: Option<String>,
 914}
 915
 916impl ExtensionEventRow {
 917    fn from_event(
 918        event: ExtensionEvent,
 919        wrapper: &EventWrapper,
 920        body: &EventRequestBody,
 921        extension_metadata: Option<ExtensionMetadata>,
 922        first_event_at: chrono::DateTime<chrono::Utc>,
 923    ) -> Self {
 924        let semver = body.semver();
 925        let time =
 926            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 927
 928        Self {
 929            app_version: body.app_version.clone(),
 930            major: semver.map(|v| v.major() as i32),
 931            minor: semver.map(|v| v.minor() as i32),
 932            patch: semver.map(|v| v.patch() as i32),
 933            release_channel: body.release_channel.clone().unwrap_or_default(),
 934            installation_id: body.installation_id.clone(),
 935            session_id: body.session_id.clone(),
 936            is_staff: body.is_staff,
 937            time: time.timestamp_millis(),
 938            extension_id: event.extension_id,
 939            extension_version: event.version,
 940            dev: extension_metadata.is_none(),
 941            schema_version: extension_metadata
 942                .as_ref()
 943                .and_then(|metadata| metadata.manifest.schema_version),
 944            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
 945                metadata
 946                    .manifest
 947                    .wasm_api_version
 948                    .as_ref()
 949                    .map(|version| version.to_string())
 950            }),
 951        }
 952    }
 953}
 954
 955#[derive(Serialize, Debug, clickhouse::Row)]
 956pub struct EditEventRow {
 957    // AppInfoBase
 958    app_version: String,
 959    major: Option<i32>,
 960    minor: Option<i32>,
 961    patch: Option<i32>,
 962    release_channel: String,
 963
 964    // ClientEventBase
 965    installation_id: Option<String>,
 966    // Note: This column name has a typo in the ClickHouse table.
 967    #[serde(rename = "sesssion_id")]
 968    session_id: Option<String>,
 969    is_staff: Option<bool>,
 970    time: i64,
 971
 972    // EditEventRow
 973    period_start: i64,
 974    period_end: i64,
 975    environment: String,
 976}
 977
 978impl EditEventRow {
 979    fn from_event(
 980        event: EditEvent,
 981        wrapper: &EventWrapper,
 982        body: &EventRequestBody,
 983        first_event_at: chrono::DateTime<chrono::Utc>,
 984    ) -> Self {
 985        let semver = body.semver();
 986        let time =
 987            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 988
 989        let period_start = time - chrono::Duration::milliseconds(event.duration);
 990        let period_end = time;
 991
 992        Self {
 993            app_version: body.app_version.clone(),
 994            major: semver.map(|v| v.major() as i32),
 995            minor: semver.map(|v| v.minor() as i32),
 996            patch: semver.map(|v| v.patch() as i32),
 997            release_channel: body.release_channel.clone().unwrap_or_default(),
 998            installation_id: body.installation_id.clone(),
 999            session_id: body.session_id.clone(),
1000            is_staff: body.is_staff,
1001            time: time.timestamp_millis(),
1002            period_start: period_start.timestamp_millis(),
1003            period_end: period_end.timestamp_millis(),
1004            environment: event.environment,
1005        }
1006    }
1007}
1008
1009#[derive(Serialize, Debug, clickhouse::Row)]
1010pub struct ActionEventRow {
1011    // AppInfoBase
1012    app_version: String,
1013    major: Option<i32>,
1014    minor: Option<i32>,
1015    patch: Option<i32>,
1016    release_channel: String,
1017
1018    // ClientEventBase
1019    installation_id: Option<String>,
1020    // Note: This column name has a typo in the ClickHouse table.
1021    #[serde(rename = "sesssion_id")]
1022    session_id: Option<String>,
1023    is_staff: Option<bool>,
1024    time: i64,
1025    // ActionEventRow
1026    source: String,
1027    action: String,
1028}
1029
1030impl ActionEventRow {
1031    fn from_event(
1032        event: ActionEvent,
1033        wrapper: &EventWrapper,
1034        body: &EventRequestBody,
1035        first_event_at: chrono::DateTime<chrono::Utc>,
1036    ) -> Self {
1037        let semver = body.semver();
1038        let time =
1039            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1040
1041        Self {
1042            app_version: body.app_version.clone(),
1043            major: semver.map(|v| v.major() as i32),
1044            minor: semver.map(|v| v.minor() as i32),
1045            patch: semver.map(|v| v.patch() as i32),
1046            release_channel: body.release_channel.clone().unwrap_or_default(),
1047            installation_id: body.installation_id.clone(),
1048            session_id: body.session_id.clone(),
1049            is_staff: body.is_staff,
1050            time: time.timestamp_millis(),
1051            source: event.source,
1052            action: event.action,
1053        }
1054    }
1055}