events.rs

   1use super::ips_file::IpsFile;
   2use crate::{api::slack, AppState, Error, Result};
   3use anyhow::{anyhow, Context};
   4use aws_sdk_s3::primitives::ByteStream;
   5use axum::{
   6    body::Bytes,
   7    headers::Header,
   8    http::{HeaderMap, HeaderName, StatusCode},
   9    routing::post,
  10    Extension, Router, TypedHeader,
  11};
  12use rpc::ExtensionMetadata;
  13use semantic_version::SemanticVersion;
  14use serde::{Serialize, Serializer};
  15use sha2::{Digest, Sha256};
  16use std::sync::{Arc, OnceLock};
  17use telemetry_events::{
  18    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
  19    EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent, SettingEvent,
  20};
  21
  22pub fn router() -> Router {
  23    Router::new()
  24        .route("/telemetry/events", post(post_events))
  25        .route("/telemetry/crashes", post(post_crash))
  26}
  27
  28pub struct ZedChecksumHeader(Vec<u8>);
  29
  30impl Header for ZedChecksumHeader {
  31    fn name() -> &'static HeaderName {
  32        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
  33        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
  34    }
  35
  36    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  37    where
  38        Self: Sized,
  39        I: Iterator<Item = &'i axum::http::HeaderValue>,
  40    {
  41        let checksum = values
  42            .next()
  43            .ok_or_else(axum::headers::Error::invalid)?
  44            .to_str()
  45            .map_err(|_| axum::headers::Error::invalid())?;
  46
  47        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
  48        Ok(Self(bytes))
  49    }
  50
  51    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  52        unimplemented!()
  53    }
  54}
  55
  56pub struct CloudflareIpCountryHeader(String);
  57
  58impl Header for CloudflareIpCountryHeader {
  59    fn name() -> &'static HeaderName {
  60        static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
  61        CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
  62    }
  63
  64    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
  65    where
  66        Self: Sized,
  67        I: Iterator<Item = &'i axum::http::HeaderValue>,
  68    {
  69        let country_code = values
  70            .next()
  71            .ok_or_else(axum::headers::Error::invalid)?
  72            .to_str()
  73            .map_err(|_| axum::headers::Error::invalid())?;
  74
  75        Ok(Self(country_code.to_string()))
  76    }
  77
  78    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
  79        unimplemented!()
  80    }
  81}
  82
  83pub async fn post_crash(
  84    Extension(app): Extension<Arc<AppState>>,
  85    headers: HeaderMap,
  86    body: Bytes,
  87) -> Result<()> {
  88    static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
  89
  90    let report = IpsFile::parse(&body)?;
  91    let version_threshold = SemanticVersion::new(0, 123, 0);
  92
  93    let bundle_id = &report.header.bundle_id;
  94    let app_version = &report.app_version();
  95
  96    if bundle_id == "dev.zed.Zed-Dev" {
  97        log::error!("Crash uploads from {} are ignored.", bundle_id);
  98        return Ok(());
  99    }
 100
 101    if app_version.is_none() || app_version.unwrap() < version_threshold {
 102        log::error!(
 103            "Crash uploads from {} are ignored.",
 104            report.header.app_version
 105        );
 106        return Ok(());
 107    }
 108    let app_version = app_version.unwrap();
 109
 110    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
 111        let response = blob_store_client
 112            .head_object()
 113            .bucket(CRASH_REPORTS_BUCKET)
 114            .key(report.header.incident_id.clone() + ".ips")
 115            .send()
 116            .await;
 117
 118        if response.is_ok() {
 119            log::info!("We've already uploaded this crash");
 120            return Ok(());
 121        }
 122
 123        blob_store_client
 124            .put_object()
 125            .bucket(CRASH_REPORTS_BUCKET)
 126            .key(report.header.incident_id.clone() + ".ips")
 127            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
 128            .body(ByteStream::from(body.to_vec()))
 129            .send()
 130            .await
 131            .map_err(|e| log::error!("Failed to upload crash: {}", e))
 132            .ok();
 133    }
 134
 135    let recent_panic_on: Option<i64> = headers
 136        .get("x-zed-panicked-on")
 137        .and_then(|h| h.to_str().ok())
 138        .and_then(|s| s.parse().ok());
 139
 140    let installation_id = headers
 141        .get("x-zed-installation-id")
 142        .and_then(|h| h.to_str().ok())
 143        .map(|s| s.to_string())
 144        .unwrap_or_default();
 145
 146    let mut recent_panic = None;
 147
 148    if let Some(recent_panic_on) = recent_panic_on {
 149        let crashed_at = match report.timestamp() {
 150            Ok(t) => Some(t),
 151            Err(e) => {
 152                log::error!("Can't parse {}: {}", report.header.timestamp, e);
 153                None
 154            }
 155        };
 156        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
 157            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
 158        }
 159    }
 160
 161    let description = report.description(recent_panic);
 162    let summary = report.backtrace_summary();
 163
 164    tracing::error!(
 165        service = "client",
 166        version = %report.header.app_version,
 167        os_version = %report.header.os_version,
 168        bundle_id = %report.header.bundle_id,
 169        incident_id = %report.header.incident_id,
 170        installation_id = %installation_id,
 171        description = %description,
 172        backtrace = %summary,
 173        "crash report");
 174
 175    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
 176        let payload = slack::WebhookBody::new(|w| {
 177            w.add_section(|s| s.text(slack::Text::markdown(description)))
 178                .add_section(|s| {
 179                    s.add_field(slack::Text::markdown(format!(
 180                        "*Version:*\n{} ({})",
 181                        bundle_id, app_version
 182                    )))
 183                    .add_field({
 184                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
 185                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
 186                            hostname.strip_prefix("http://").unwrap_or_default()
 187                        });
 188
 189                        slack::Text::markdown(format!(
 190                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
 191                            CRASH_REPORTS_BUCKET,
 192                            hostname,
 193                            report.header.incident_id,
 194                            report
 195                                .header
 196                                .incident_id
 197                                .chars()
 198                                .take(8)
 199                                .collect::<String>(),
 200                        ))
 201                    })
 202                })
 203                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
 204        });
 205        let payload_json = serde_json::to_string(&payload).map_err(|err| {
 206            log::error!("Failed to serialize payload to JSON: {err}");
 207            Error::Internal(anyhow!(err))
 208        })?;
 209
 210        reqwest::Client::new()
 211            .post(slack_panics_webhook)
 212            .header("Content-Type", "application/json")
 213            .body(payload_json)
 214            .send()
 215            .await
 216            .map_err(|err| {
 217                log::error!("Failed to send payload to Slack: {err}");
 218                Error::Internal(anyhow!(err))
 219            })?;
 220    }
 221
 222    Ok(())
 223}
 224
 225pub async fn post_events(
 226    Extension(app): Extension<Arc<AppState>>,
 227    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 228    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 229    body: Bytes,
 230) -> Result<()> {
 231    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 232        Err(Error::Http(
 233            StatusCode::NOT_IMPLEMENTED,
 234            "not supported".into(),
 235        ))?
 236    };
 237
 238    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
 239        return Err(Error::Http(
 240            StatusCode::INTERNAL_SERVER_ERROR,
 241            "events not enabled".into(),
 242        ))?;
 243    };
 244
 245    let mut summer = Sha256::new();
 246    summer.update(checksum_seed);
 247    summer.update(&body);
 248    summer.update(checksum_seed);
 249
 250    if &checksum != &summer.finalize()[..] {
 251        return Err(Error::Http(
 252            StatusCode::BAD_REQUEST,
 253            "invalid checksum".into(),
 254        ))?;
 255    }
 256
 257    let request_body: telemetry_events::EventRequestBody =
 258        serde_json::from_slice(&body).map_err(|err| {
 259            log::error!("can't parse event json: {err}");
 260            Error::Internal(anyhow!(err))
 261        })?;
 262
 263    let mut to_upload = ToUpload::default();
 264    let Some(last_event) = request_body.events.last() else {
 265        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
 266    };
 267    let country_code = country_code_header.map(|h| h.0 .0);
 268
 269    let first_event_at = chrono::Utc::now()
 270        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 271
 272    for wrapper in &request_body.events {
 273        match &wrapper.event {
 274            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
 275                event.clone(),
 276                &wrapper,
 277                &request_body,
 278                first_event_at,
 279                country_code.clone(),
 280            )),
 281            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
 282                event.clone(),
 283                &wrapper,
 284                &request_body,
 285                first_event_at,
 286                country_code.clone(),
 287            )),
 288            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
 289                event.clone(),
 290                &wrapper,
 291                &request_body,
 292                first_event_at,
 293            )),
 294            Event::Assistant(event) => {
 295                to_upload
 296                    .assistant_events
 297                    .push(AssistantEventRow::from_event(
 298                        event.clone(),
 299                        &wrapper,
 300                        &request_body,
 301                        first_event_at,
 302                    ))
 303            }
 304            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
 305                event.clone(),
 306                &wrapper,
 307                &request_body,
 308                first_event_at,
 309            )),
 310            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
 311                event.clone(),
 312                &wrapper,
 313                &request_body,
 314                first_event_at,
 315            )),
 316            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
 317                event.clone(),
 318                &wrapper,
 319                &request_body,
 320                first_event_at,
 321            )),
 322            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
 323                event.clone(),
 324                &wrapper,
 325                &request_body,
 326                first_event_at,
 327            )),
 328            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
 329                event.clone(),
 330                &wrapper,
 331                &request_body,
 332                first_event_at,
 333            )),
 334            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
 335                event.clone(),
 336                &wrapper,
 337                &request_body,
 338                first_event_at,
 339            )),
 340            Event::Extension(event) => {
 341                let metadata = app
 342                    .db
 343                    .get_extension_version(&event.extension_id, &event.version)
 344                    .await?;
 345                to_upload
 346                    .extension_events
 347                    .push(ExtensionEventRow::from_event(
 348                        event.clone(),
 349                        &wrapper,
 350                        &request_body,
 351                        metadata,
 352                        first_event_at,
 353                    ))
 354            }
 355        }
 356    }
 357
 358    to_upload
 359        .upload(&clickhouse_client)
 360        .await
 361        .map_err(|err| Error::Internal(anyhow!(err)))?;
 362
 363    Ok(())
 364}
 365
 366#[derive(Default)]
 367struct ToUpload {
 368    editor_events: Vec<EditorEventRow>,
 369    copilot_events: Vec<CopilotEventRow>,
 370    assistant_events: Vec<AssistantEventRow>,
 371    call_events: Vec<CallEventRow>,
 372    cpu_events: Vec<CpuEventRow>,
 373    memory_events: Vec<MemoryEventRow>,
 374    app_events: Vec<AppEventRow>,
 375    setting_events: Vec<SettingEventRow>,
 376    extension_events: Vec<ExtensionEventRow>,
 377    edit_events: Vec<EditEventRow>,
 378    action_events: Vec<ActionEventRow>,
 379}
 380
 381impl ToUpload {
 382    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
 383        const EDITOR_EVENTS_TABLE: &str = "editor_events";
 384        Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
 385            .await
 386            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
 387
 388        const COPILOT_EVENTS_TABLE: &str = "copilot_events";
 389        Self::upload_to_table(
 390            COPILOT_EVENTS_TABLE,
 391            &self.copilot_events,
 392            clickhouse_client,
 393        )
 394        .await
 395        .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
 396
 397        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
 398        Self::upload_to_table(
 399            ASSISTANT_EVENTS_TABLE,
 400            &self.assistant_events,
 401            clickhouse_client,
 402        )
 403        .await
 404        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
 405
 406        const CALL_EVENTS_TABLE: &str = "call_events";
 407        Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
 408            .await
 409            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
 410
 411        const CPU_EVENTS_TABLE: &str = "cpu_events";
 412        Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
 413            .await
 414            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
 415
 416        const MEMORY_EVENTS_TABLE: &str = "memory_events";
 417        Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
 418            .await
 419            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
 420
 421        const APP_EVENTS_TABLE: &str = "app_events";
 422        Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
 423            .await
 424            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
 425
 426        const SETTING_EVENTS_TABLE: &str = "setting_events";
 427        Self::upload_to_table(
 428            SETTING_EVENTS_TABLE,
 429            &self.setting_events,
 430            clickhouse_client,
 431        )
 432        .await
 433        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
 434
 435        const EXTENSION_EVENTS_TABLE: &str = "extension_events";
 436        Self::upload_to_table(
 437            EXTENSION_EVENTS_TABLE,
 438            &self.extension_events,
 439            clickhouse_client,
 440        )
 441        .await
 442        .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?;
 443
 444        const EDIT_EVENTS_TABLE: &str = "edit_events";
 445        Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
 446            .await
 447            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
 448
 449        const ACTION_EVENTS_TABLE: &str = "action_events";
 450        Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
 451            .await
 452            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 453
 454        Ok(())
 455    }
 456
 457    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
 458        table: &str,
 459        rows: &[T],
 460        clickhouse_client: &clickhouse::Client,
 461    ) -> anyhow::Result<()> {
 462        if !rows.is_empty() {
 463            let mut insert = clickhouse_client.insert(table)?;
 464
 465            for event in rows {
 466                insert.write(event).await?;
 467            }
 468
 469            insert.end().await?;
 470
 471            let event_count = rows.len();
 472            log::info!(
 473                "wrote {event_count} {event_specifier} to '{table}'",
 474                event_specifier = if event_count == 1 { "event" } else { "events" }
 475            );
 476        }
 477
 478        Ok(())
 479    }
 480}
 481
 482pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
 483where
 484    S: Serializer,
 485{
 486    if country_code.len() != 2 {
 487        use serde::ser::Error;
 488        return Err(S::Error::custom(
 489            "country_code must be exactly 2 characters",
 490        ));
 491    }
 492
 493    let country_code = country_code.as_bytes();
 494
 495    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
 496}
 497
 498#[derive(Serialize, Debug, clickhouse::Row)]
 499pub struct EditorEventRow {
 500    pub installation_id: String,
 501    pub operation: String,
 502    pub app_version: String,
 503    pub file_extension: String,
 504    pub os_name: String,
 505    pub os_version: String,
 506    pub release_channel: String,
 507    pub signed_in: bool,
 508    pub vim_mode: bool,
 509    #[serde(serialize_with = "serialize_country_code")]
 510    pub country_code: String,
 511    pub region_code: String,
 512    pub city: String,
 513    pub time: i64,
 514    pub copilot_enabled: bool,
 515    pub copilot_enabled_for_language: bool,
 516    pub historical_event: bool,
 517    pub architecture: String,
 518    pub is_staff: Option<bool>,
 519    pub session_id: Option<String>,
 520    pub major: Option<i32>,
 521    pub minor: Option<i32>,
 522    pub patch: Option<i32>,
 523}
 524
 525impl EditorEventRow {
 526    fn from_event(
 527        event: EditorEvent,
 528        wrapper: &EventWrapper,
 529        body: &EventRequestBody,
 530        first_event_at: chrono::DateTime<chrono::Utc>,
 531        country_code: Option<String>,
 532    ) -> Self {
 533        let semver = body.semver();
 534        let time =
 535            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 536
 537        Self {
 538            app_version: body.app_version.clone(),
 539            major: semver.map(|v| v.major() as i32),
 540            minor: semver.map(|v| v.minor() as i32),
 541            patch: semver.map(|v| v.patch() as i32),
 542            release_channel: body.release_channel.clone().unwrap_or_default(),
 543            os_name: body.os_name.clone(),
 544            os_version: body.os_version.clone().unwrap_or_default(),
 545            architecture: body.architecture.clone(),
 546            installation_id: body.installation_id.clone().unwrap_or_default(),
 547            session_id: body.session_id.clone(),
 548            is_staff: body.is_staff,
 549            time: time.timestamp_millis(),
 550            operation: event.operation,
 551            file_extension: event.file_extension.unwrap_or_default(),
 552            signed_in: wrapper.signed_in,
 553            vim_mode: event.vim_mode,
 554            copilot_enabled: event.copilot_enabled,
 555            copilot_enabled_for_language: event.copilot_enabled_for_language,
 556            country_code: country_code.unwrap_or("XX".to_string()),
 557            region_code: "".to_string(),
 558            city: "".to_string(),
 559            historical_event: false,
 560        }
 561    }
 562}
 563
 564#[derive(Serialize, Debug, clickhouse::Row)]
 565pub struct CopilotEventRow {
 566    pub installation_id: String,
 567    pub suggestion_id: String,
 568    pub suggestion_accepted: bool,
 569    pub app_version: String,
 570    pub file_extension: String,
 571    pub os_name: String,
 572    pub os_version: String,
 573    pub release_channel: String,
 574    pub signed_in: bool,
 575    #[serde(serialize_with = "serialize_country_code")]
 576    pub country_code: String,
 577    pub region_code: String,
 578    pub city: String,
 579    pub time: i64,
 580    pub is_staff: Option<bool>,
 581    pub session_id: Option<String>,
 582    pub major: Option<i32>,
 583    pub minor: Option<i32>,
 584    pub patch: Option<i32>,
 585}
 586
 587impl CopilotEventRow {
 588    fn from_event(
 589        event: CopilotEvent,
 590        wrapper: &EventWrapper,
 591        body: &EventRequestBody,
 592        first_event_at: chrono::DateTime<chrono::Utc>,
 593        country_code: Option<String>,
 594    ) -> Self {
 595        let semver = body.semver();
 596        let time =
 597            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 598
 599        Self {
 600            app_version: body.app_version.clone(),
 601            major: semver.map(|v| v.major() as i32),
 602            minor: semver.map(|v| v.minor() as i32),
 603            patch: semver.map(|v| v.patch() as i32),
 604            release_channel: body.release_channel.clone().unwrap_or_default(),
 605            os_name: body.os_name.clone(),
 606            os_version: body.os_version.clone().unwrap_or_default(),
 607            installation_id: body.installation_id.clone().unwrap_or_default(),
 608            session_id: body.session_id.clone(),
 609            is_staff: body.is_staff,
 610            time: time.timestamp_millis(),
 611            file_extension: event.file_extension.unwrap_or_default(),
 612            signed_in: wrapper.signed_in,
 613            country_code: country_code.unwrap_or("XX".to_string()),
 614            region_code: "".to_string(),
 615            city: "".to_string(),
 616            suggestion_id: event.suggestion_id.unwrap_or_default(),
 617            suggestion_accepted: event.suggestion_accepted,
 618        }
 619    }
 620}
 621
 622#[derive(Serialize, Debug, clickhouse::Row)]
 623pub struct CallEventRow {
 624    // AppInfoBase
 625    app_version: String,
 626    major: Option<i32>,
 627    minor: Option<i32>,
 628    patch: Option<i32>,
 629    release_channel: String,
 630
 631    // ClientEventBase
 632    installation_id: String,
 633    session_id: Option<String>,
 634    is_staff: Option<bool>,
 635    time: i64,
 636
 637    // CallEventRow
 638    operation: String,
 639    room_id: Option<u64>,
 640    channel_id: Option<u64>,
 641}
 642
 643impl CallEventRow {
 644    fn from_event(
 645        event: CallEvent,
 646        wrapper: &EventWrapper,
 647        body: &EventRequestBody,
 648        first_event_at: chrono::DateTime<chrono::Utc>,
 649    ) -> Self {
 650        let semver = body.semver();
 651        let time =
 652            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 653
 654        Self {
 655            app_version: body.app_version.clone(),
 656            major: semver.map(|v| v.major() as i32),
 657            minor: semver.map(|v| v.minor() as i32),
 658            patch: semver.map(|v| v.patch() as i32),
 659            release_channel: body.release_channel.clone().unwrap_or_default(),
 660            installation_id: body.installation_id.clone().unwrap_or_default(),
 661            session_id: body.session_id.clone(),
 662            is_staff: body.is_staff,
 663            time: time.timestamp_millis(),
 664            operation: event.operation,
 665            room_id: event.room_id,
 666            channel_id: event.channel_id,
 667        }
 668    }
 669}
 670
 671#[derive(Serialize, Debug, clickhouse::Row)]
 672pub struct AssistantEventRow {
 673    // AppInfoBase
 674    app_version: String,
 675    major: Option<i32>,
 676    minor: Option<i32>,
 677    patch: Option<i32>,
 678    release_channel: String,
 679
 680    // ClientEventBase
 681    installation_id: Option<String>,
 682    session_id: Option<String>,
 683    is_staff: Option<bool>,
 684    time: i64,
 685
 686    // AssistantEventRow
 687    conversation_id: String,
 688    kind: String,
 689    model: String,
 690}
 691
 692impl AssistantEventRow {
 693    fn from_event(
 694        event: AssistantEvent,
 695        wrapper: &EventWrapper,
 696        body: &EventRequestBody,
 697        first_event_at: chrono::DateTime<chrono::Utc>,
 698    ) -> Self {
 699        let semver = body.semver();
 700        let time =
 701            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 702
 703        Self {
 704            app_version: body.app_version.clone(),
 705            major: semver.map(|v| v.major() as i32),
 706            minor: semver.map(|v| v.minor() as i32),
 707            patch: semver.map(|v| v.patch() as i32),
 708            release_channel: body.release_channel.clone().unwrap_or_default(),
 709            installation_id: body.installation_id.clone(),
 710            session_id: body.session_id.clone(),
 711            is_staff: body.is_staff,
 712            time: time.timestamp_millis(),
 713            conversation_id: event.conversation_id.unwrap_or_default(),
 714            kind: event.kind.to_string(),
 715            model: event.model,
 716        }
 717    }
 718}
 719
 720#[derive(Debug, clickhouse::Row, Serialize)]
 721pub struct CpuEventRow {
 722    pub installation_id: Option<String>,
 723    pub is_staff: Option<bool>,
 724    pub usage_as_percentage: f32,
 725    pub core_count: u32,
 726    pub app_version: String,
 727    pub release_channel: String,
 728    pub time: i64,
 729    pub session_id: Option<String>,
 730    // pub normalized_cpu_usage: f64, MATERIALIZED
 731    pub major: Option<i32>,
 732    pub minor: Option<i32>,
 733    pub patch: Option<i32>,
 734}
 735
 736impl CpuEventRow {
 737    fn from_event(
 738        event: CpuEvent,
 739        wrapper: &EventWrapper,
 740        body: &EventRequestBody,
 741        first_event_at: chrono::DateTime<chrono::Utc>,
 742    ) -> Self {
 743        let semver = body.semver();
 744        let time =
 745            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 746
 747        Self {
 748            app_version: body.app_version.clone(),
 749            major: semver.map(|v| v.major() as i32),
 750            minor: semver.map(|v| v.minor() as i32),
 751            patch: semver.map(|v| v.patch() as i32),
 752            release_channel: body.release_channel.clone().unwrap_or_default(),
 753            installation_id: body.installation_id.clone(),
 754            session_id: body.session_id.clone(),
 755            is_staff: body.is_staff,
 756            time: time.timestamp_millis(),
 757            usage_as_percentage: event.usage_as_percentage,
 758            core_count: event.core_count,
 759        }
 760    }
 761}
 762
 763#[derive(Serialize, Debug, clickhouse::Row)]
 764pub struct MemoryEventRow {
 765    // AppInfoBase
 766    app_version: String,
 767    major: Option<i32>,
 768    minor: Option<i32>,
 769    patch: Option<i32>,
 770    release_channel: String,
 771
 772    // ClientEventBase
 773    installation_id: Option<String>,
 774    session_id: Option<String>,
 775    is_staff: Option<bool>,
 776    time: i64,
 777
 778    // MemoryEventRow
 779    memory_in_bytes: u64,
 780    virtual_memory_in_bytes: u64,
 781}
 782
 783impl MemoryEventRow {
 784    fn from_event(
 785        event: MemoryEvent,
 786        wrapper: &EventWrapper,
 787        body: &EventRequestBody,
 788        first_event_at: chrono::DateTime<chrono::Utc>,
 789    ) -> Self {
 790        let semver = body.semver();
 791        let time =
 792            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 793
 794        Self {
 795            app_version: body.app_version.clone(),
 796            major: semver.map(|v| v.major() as i32),
 797            minor: semver.map(|v| v.minor() as i32),
 798            patch: semver.map(|v| v.patch() as i32),
 799            release_channel: body.release_channel.clone().unwrap_or_default(),
 800            installation_id: body.installation_id.clone(),
 801            session_id: body.session_id.clone(),
 802            is_staff: body.is_staff,
 803            time: time.timestamp_millis(),
 804            memory_in_bytes: event.memory_in_bytes,
 805            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
 806        }
 807    }
 808}
 809
 810#[derive(Serialize, Debug, clickhouse::Row)]
 811pub struct AppEventRow {
 812    // AppInfoBase
 813    app_version: String,
 814    major: Option<i32>,
 815    minor: Option<i32>,
 816    patch: Option<i32>,
 817    release_channel: String,
 818
 819    // ClientEventBase
 820    installation_id: Option<String>,
 821    session_id: Option<String>,
 822    is_staff: Option<bool>,
 823    time: i64,
 824
 825    // AppEventRow
 826    operation: String,
 827}
 828
 829impl AppEventRow {
 830    fn from_event(
 831        event: AppEvent,
 832        wrapper: &EventWrapper,
 833        body: &EventRequestBody,
 834        first_event_at: chrono::DateTime<chrono::Utc>,
 835    ) -> Self {
 836        let semver = body.semver();
 837        let time =
 838            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 839
 840        Self {
 841            app_version: body.app_version.clone(),
 842            major: semver.map(|v| v.major() as i32),
 843            minor: semver.map(|v| v.minor() as i32),
 844            patch: semver.map(|v| v.patch() as i32),
 845            release_channel: body.release_channel.clone().unwrap_or_default(),
 846            installation_id: body.installation_id.clone(),
 847            session_id: body.session_id.clone(),
 848            is_staff: body.is_staff,
 849            time: time.timestamp_millis(),
 850            operation: event.operation,
 851        }
 852    }
 853}
 854
 855#[derive(Serialize, Debug, clickhouse::Row)]
 856pub struct SettingEventRow {
 857    // AppInfoBase
 858    app_version: String,
 859    major: Option<i32>,
 860    minor: Option<i32>,
 861    patch: Option<i32>,
 862    release_channel: String,
 863
 864    // ClientEventBase
 865    installation_id: Option<String>,
 866    session_id: Option<String>,
 867    is_staff: Option<bool>,
 868    time: i64,
 869    // SettingEventRow
 870    setting: String,
 871    value: String,
 872}
 873
 874impl SettingEventRow {
 875    fn from_event(
 876        event: SettingEvent,
 877        wrapper: &EventWrapper,
 878        body: &EventRequestBody,
 879        first_event_at: chrono::DateTime<chrono::Utc>,
 880    ) -> Self {
 881        let semver = body.semver();
 882        let time =
 883            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 884
 885        Self {
 886            app_version: body.app_version.clone(),
 887            major: semver.map(|v| v.major() as i32),
 888            minor: semver.map(|v| v.minor() as i32),
 889            patch: semver.map(|v| v.patch() as i32),
 890            release_channel: body.release_channel.clone().unwrap_or_default(),
 891            installation_id: body.installation_id.clone(),
 892            session_id: body.session_id.clone(),
 893            is_staff: body.is_staff,
 894            time: time.timestamp_millis(),
 895            setting: event.setting,
 896            value: event.value,
 897        }
 898    }
 899}
 900
 901#[derive(Serialize, Debug, clickhouse::Row)]
 902pub struct ExtensionEventRow {
 903    // AppInfoBase
 904    app_version: String,
 905    major: Option<i32>,
 906    minor: Option<i32>,
 907    patch: Option<i32>,
 908    release_channel: String,
 909
 910    // ClientEventBase
 911    installation_id: Option<String>,
 912    session_id: Option<String>,
 913    is_staff: Option<bool>,
 914    time: i64,
 915
 916    // ExtensionEventRow
 917    extension_id: Arc<str>,
 918    extension_version: Arc<str>,
 919    dev: bool,
 920    schema_version: Option<i32>,
 921    wasm_api_version: Option<String>,
 922}
 923
 924impl ExtensionEventRow {
 925    fn from_event(
 926        event: ExtensionEvent,
 927        wrapper: &EventWrapper,
 928        body: &EventRequestBody,
 929        extension_metadata: Option<ExtensionMetadata>,
 930        first_event_at: chrono::DateTime<chrono::Utc>,
 931    ) -> Self {
 932        let semver = body.semver();
 933        let time =
 934            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 935
 936        Self {
 937            app_version: body.app_version.clone(),
 938            major: semver.map(|v| v.major() as i32),
 939            minor: semver.map(|v| v.minor() as i32),
 940            patch: semver.map(|v| v.patch() as i32),
 941            release_channel: body.release_channel.clone().unwrap_or_default(),
 942            installation_id: body.installation_id.clone(),
 943            session_id: body.session_id.clone(),
 944            is_staff: body.is_staff,
 945            time: time.timestamp_millis(),
 946            extension_id: event.extension_id,
 947            extension_version: event.version,
 948            dev: extension_metadata.is_none(),
 949            schema_version: extension_metadata
 950                .as_ref()
 951                .and_then(|metadata| metadata.manifest.schema_version),
 952            wasm_api_version: extension_metadata.as_ref().and_then(|metadata| {
 953                metadata
 954                    .manifest
 955                    .wasm_api_version
 956                    .as_ref()
 957                    .map(|version| version.to_string())
 958            }),
 959        }
 960    }
 961}
 962
 963#[derive(Serialize, Debug, clickhouse::Row)]
 964pub struct EditEventRow {
 965    // AppInfoBase
 966    app_version: String,
 967    major: Option<i32>,
 968    minor: Option<i32>,
 969    patch: Option<i32>,
 970    release_channel: String,
 971
 972    // ClientEventBase
 973    installation_id: Option<String>,
 974    // Note: This column name has a typo in the ClickHouse table.
 975    #[serde(rename = "sesssion_id")]
 976    session_id: Option<String>,
 977    is_staff: Option<bool>,
 978    time: i64,
 979
 980    // EditEventRow
 981    period_start: i64,
 982    period_end: i64,
 983    environment: String,
 984}
 985
 986impl EditEventRow {
 987    fn from_event(
 988        event: EditEvent,
 989        wrapper: &EventWrapper,
 990        body: &EventRequestBody,
 991        first_event_at: chrono::DateTime<chrono::Utc>,
 992    ) -> Self {
 993        let semver = body.semver();
 994        let time =
 995            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
 996
 997        let period_start = time - chrono::Duration::milliseconds(event.duration);
 998        let period_end = time;
 999
1000        Self {
1001            app_version: body.app_version.clone(),
1002            major: semver.map(|v| v.major() as i32),
1003            minor: semver.map(|v| v.minor() as i32),
1004            patch: semver.map(|v| v.patch() as i32),
1005            release_channel: body.release_channel.clone().unwrap_or_default(),
1006            installation_id: body.installation_id.clone(),
1007            session_id: body.session_id.clone(),
1008            is_staff: body.is_staff,
1009            time: time.timestamp_millis(),
1010            period_start: period_start.timestamp_millis(),
1011            period_end: period_end.timestamp_millis(),
1012            environment: event.environment,
1013        }
1014    }
1015}
1016
1017#[derive(Serialize, Debug, clickhouse::Row)]
1018pub struct ActionEventRow {
1019    // AppInfoBase
1020    app_version: String,
1021    major: Option<i32>,
1022    minor: Option<i32>,
1023    patch: Option<i32>,
1024    release_channel: String,
1025
1026    // ClientEventBase
1027    installation_id: Option<String>,
1028    // Note: This column name has a typo in the ClickHouse table.
1029    #[serde(rename = "sesssion_id")]
1030    session_id: Option<String>,
1031    is_staff: Option<bool>,
1032    time: i64,
1033    // ActionEventRow
1034    source: String,
1035    action: String,
1036}
1037
1038impl ActionEventRow {
1039    fn from_event(
1040        event: ActionEvent,
1041        wrapper: &EventWrapper,
1042        body: &EventRequestBody,
1043        first_event_at: chrono::DateTime<chrono::Utc>,
1044    ) -> Self {
1045        let semver = body.semver();
1046        let time =
1047            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
1048
1049        Self {
1050            app_version: body.app_version.clone(),
1051            major: semver.map(|v| v.major() as i32),
1052            minor: semver.map(|v| v.minor() as i32),
1053            patch: semver.map(|v| v.patch() as i32),
1054            release_channel: body.release_channel.clone().unwrap_or_default(),
1055            installation_id: body.installation_id.clone(),
1056            session_id: body.session_id.clone(),
1057            is_staff: body.is_staff,
1058            time: time.timestamp_millis(),
1059            source: event.source,
1060            action: event.action,
1061        }
1062    }
1063}