events.rs

  1use std::sync::{Arc, OnceLock};
  2
  3use anyhow::{anyhow, Context};
  4use aws_sdk_s3::primitives::ByteStream;
  5use axum::{
  6    body::Bytes, headers::Header, http::HeaderName, routing::post, Extension, Router, TypedHeader,
  7};
  8use hyper::{HeaderMap, StatusCode};
  9use serde::{Serialize, Serializer};
 10use sha2::{Digest, Sha256};
 11use telemetry_events::{
 12    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
 13    EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
 14};
 15use util::SemanticVersion;
 16
 17use crate::{api::slack, AppState, Error, Result};
 18
 19use super::ips_file::IpsFile;
 20
 21pub fn router() -> Router {
 22    Router::new()
 23        .route("/telemetry/events", post(post_events))
 24        .route("/telemetry/crashes", post(post_crash))
 25}
 26
 27pub struct ZedChecksumHeader(Vec<u8>);
 28
 29impl Header for ZedChecksumHeader {
 30    fn name() -> &'static HeaderName {
 31        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
 32        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
 33    }
 34
 35    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 36    where
 37        Self: Sized,
 38        I: Iterator<Item = &'i axum::http::HeaderValue>,
 39    {
 40        let checksum = values
 41            .next()
 42            .ok_or_else(axum::headers::Error::invalid)?
 43            .to_str()
 44            .map_err(|_| axum::headers::Error::invalid())?;
 45
 46        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
 47        Ok(Self(bytes))
 48    }
 49
 50    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 51        unimplemented!()
 52    }
 53}
 54
 55pub struct CloudflareIpCountryHeader(String);
 56
 57impl Header for CloudflareIpCountryHeader {
 58    fn name() -> &'static HeaderName {
 59        static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
 60        CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
 61    }
 62
 63    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 64    where
 65        Self: Sized,
 66        I: Iterator<Item = &'i axum::http::HeaderValue>,
 67    {
 68        let country_code = values
 69            .next()
 70            .ok_or_else(axum::headers::Error::invalid)?
 71            .to_str()
 72            .map_err(|_| axum::headers::Error::invalid())?;
 73
 74        Ok(Self(country_code.to_string()))
 75    }
 76
 77    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 78        unimplemented!()
 79    }
 80}
 81
 82pub async fn post_crash(
 83    Extension(app): Extension<Arc<AppState>>,
 84    body: Bytes,
 85    headers: HeaderMap,
 86) -> Result<()> {
 87    static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
 88
 89    let report = IpsFile::parse(&body)?;
 90    let version_threshold = SemanticVersion::new(0, 123, 0);
 91
 92    let bundle_id = &report.header.bundle_id;
 93    let app_version = &report.app_version();
 94
 95    if bundle_id == "dev.zed.Zed-Dev" {
 96        log::error!("Crash uploads from {} are ignored.", bundle_id);
 97        return Ok(());
 98    }
 99
100    if app_version.is_none() || app_version.unwrap() < version_threshold {
101        log::error!(
102            "Crash uploads from {} are ignored.",
103            report.header.app_version
104        );
105        return Ok(());
106    }
107    let app_version = app_version.unwrap();
108
109    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
110        let response = blob_store_client
111            .head_object()
112            .bucket(CRASH_REPORTS_BUCKET)
113            .key(report.header.incident_id.clone() + ".ips")
114            .send()
115            .await;
116
117        if response.is_ok() {
118            log::info!("We've already uploaded this crash");
119            return Ok(());
120        }
121
122        blob_store_client
123            .put_object()
124            .bucket(CRASH_REPORTS_BUCKET)
125            .key(report.header.incident_id.clone() + ".ips")
126            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
127            .body(ByteStream::from(body.to_vec()))
128            .send()
129            .await
130            .map_err(|e| log::error!("Failed to upload crash: {}", e))
131            .ok();
132    }
133
134    let recent_panic_on: Option<i64> = headers
135        .get("x-zed-panicked-on")
136        .and_then(|h| h.to_str().ok())
137        .and_then(|s| s.parse().ok());
138    let mut recent_panic = None;
139
140    if let Some(recent_panic_on) = recent_panic_on {
141        let crashed_at = match report.timestamp() {
142            Ok(t) => Some(t),
143            Err(e) => {
144                log::error!("Can't parse {}: {}", report.header.timestamp, e);
145                None
146            }
147        };
148        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
149            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
150        }
151    }
152
153    let description = report.description(recent_panic);
154    let summary = report.backtrace_summary();
155
156    tracing::error!(
157        service = "client",
158        version = %report.header.app_version,
159        os_version = %report.header.os_version,
160        bundle_id = %report.header.bundle_id,
161        incident_id = %report.header.incident_id,
162        description = %description,
163        backtrace = %summary,
164        "crash report");
165
166    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
167        let payload = slack::WebhookBody::new(|w| {
168            w.add_section(|s| s.text(slack::Text::markdown(description)))
169                .add_section(|s| {
170                    s.add_field(slack::Text::markdown(format!(
171                        "*Version:*\n{} ({})",
172                        bundle_id, app_version
173                    )))
174                    .add_field({
175                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
176                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
177                            hostname.strip_prefix("http://").unwrap_or_default()
178                        });
179
180                        slack::Text::markdown(format!(
181                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
182                            CRASH_REPORTS_BUCKET,
183                            hostname,
184                            report.header.incident_id,
185                            report
186                                .header
187                                .incident_id
188                                .chars()
189                                .take(8)
190                                .collect::<String>(),
191                        ))
192                    })
193                })
194                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
195        });
196        let payload_json = serde_json::to_string(&payload).map_err(|err| {
197            log::error!("Failed to serialize payload to JSON: {err}");
198            Error::Internal(anyhow!(err))
199        })?;
200
201        reqwest::Client::new()
202            .post(slack_panics_webhook)
203            .header("Content-Type", "application/json")
204            .body(payload_json)
205            .send()
206            .await
207            .map_err(|err| {
208                log::error!("Failed to send payload to Slack: {err}");
209                Error::Internal(anyhow!(err))
210            })?;
211    }
212
213    Ok(())
214}
215
216pub async fn post_events(
217    Extension(app): Extension<Arc<AppState>>,
218    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
219    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
220    body: Bytes,
221) -> Result<()> {
222    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
223        Err(Error::Http(
224            StatusCode::NOT_IMPLEMENTED,
225            "not supported".into(),
226        ))?
227    };
228
229    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
230        return Err(Error::Http(
231            StatusCode::INTERNAL_SERVER_ERROR,
232            "events not enabled".into(),
233        ))?;
234    };
235
236    let mut summer = Sha256::new();
237    summer.update(checksum_seed);
238    summer.update(&body);
239    summer.update(checksum_seed);
240
241    if &checksum != &summer.finalize()[..] {
242        return Err(Error::Http(
243            StatusCode::BAD_REQUEST,
244            "invalid checksum".into(),
245        ))?;
246    }
247
248    let request_body: telemetry_events::EventRequestBody =
249        serde_json::from_slice(&body).map_err(|err| {
250            log::error!("can't parse event json: {err}");
251            Error::Internal(anyhow!(err))
252        })?;
253
254    let mut to_upload = ToUpload::default();
255    let Some(last_event) = request_body.events.last() else {
256        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
257    };
258    let country_code = country_code_header.map(|h| h.0 .0);
259
260    let first_event_at = chrono::Utc::now()
261        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
262
263    for wrapper in &request_body.events {
264        match &wrapper.event {
265            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
266                event.clone(),
267                &wrapper,
268                &request_body,
269                first_event_at,
270                country_code.clone(),
271            )),
272            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
273                event.clone(),
274                &wrapper,
275                &request_body,
276                first_event_at,
277                country_code.clone(),
278            )),
279            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
280                event.clone(),
281                &wrapper,
282                &request_body,
283                first_event_at,
284            )),
285            Event::Assistant(event) => {
286                to_upload
287                    .assistant_events
288                    .push(AssistantEventRow::from_event(
289                        event.clone(),
290                        &wrapper,
291                        &request_body,
292                        first_event_at,
293                    ))
294            }
295            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
296                event.clone(),
297                &wrapper,
298                &request_body,
299                first_event_at,
300            )),
301            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
302                event.clone(),
303                &wrapper,
304                &request_body,
305                first_event_at,
306            )),
307            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
308                event.clone(),
309                &wrapper,
310                &request_body,
311                first_event_at,
312            )),
313            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
314                event.clone(),
315                &wrapper,
316                &request_body,
317                first_event_at,
318            )),
319            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
320                event.clone(),
321                &wrapper,
322                &request_body,
323                first_event_at,
324            )),
325            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
326                event.clone(),
327                &wrapper,
328                &request_body,
329                first_event_at,
330            )),
331        }
332    }
333
334    to_upload
335        .upload(&clickhouse_client)
336        .await
337        .map_err(|err| Error::Internal(anyhow!(err)))?;
338
339    Ok(())
340}
341
342#[derive(Default)]
343struct ToUpload {
344    editor_events: Vec<EditorEventRow>,
345    copilot_events: Vec<CopilotEventRow>,
346    assistant_events: Vec<AssistantEventRow>,
347    call_events: Vec<CallEventRow>,
348    cpu_events: Vec<CpuEventRow>,
349    memory_events: Vec<MemoryEventRow>,
350    app_events: Vec<AppEventRow>,
351    setting_events: Vec<SettingEventRow>,
352    edit_events: Vec<EditEventRow>,
353    action_events: Vec<ActionEventRow>,
354}
355
356impl ToUpload {
357    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
358        const EDITOR_EVENTS_TABLE: &str = "editor_events";
359        Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
360            .await
361            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
362
363        const COPILOT_EVENTS_TABLE: &str = "copilot_events";
364        Self::upload_to_table(
365            COPILOT_EVENTS_TABLE,
366            &self.copilot_events,
367            clickhouse_client,
368        )
369        .await
370        .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
371
372        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
373        Self::upload_to_table(
374            ASSISTANT_EVENTS_TABLE,
375            &self.assistant_events,
376            clickhouse_client,
377        )
378        .await
379        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
380
381        const CALL_EVENTS_TABLE: &str = "call_events";
382        Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
383            .await
384            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
385
386        const CPU_EVENTS_TABLE: &str = "cpu_events";
387        Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
388            .await
389            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
390
391        const MEMORY_EVENTS_TABLE: &str = "memory_events";
392        Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
393            .await
394            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
395
396        const APP_EVENTS_TABLE: &str = "app_events";
397        Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
398            .await
399            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
400
401        const SETTING_EVENTS_TABLE: &str = "setting_events";
402        Self::upload_to_table(
403            SETTING_EVENTS_TABLE,
404            &self.setting_events,
405            clickhouse_client,
406        )
407        .await
408        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
409
410        const EDIT_EVENTS_TABLE: &str = "edit_events";
411        Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
412            .await
413            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
414
415        const ACTION_EVENTS_TABLE: &str = "action_events";
416        Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
417            .await
418            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
419
420        Ok(())
421    }
422
423    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
424        table: &str,
425        rows: &[T],
426        clickhouse_client: &clickhouse::Client,
427    ) -> anyhow::Result<()> {
428        if !rows.is_empty() {
429            let mut insert = clickhouse_client.insert(table)?;
430
431            for event in rows {
432                insert.write(event).await?;
433            }
434
435            insert.end().await?;
436        }
437
438        Ok(())
439    }
440}
441
442pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
443where
444    S: Serializer,
445{
446    if country_code.len() != 2 {
447        use serde::ser::Error;
448        return Err(S::Error::custom(
449            "country_code must be exactly 2 characters",
450        ));
451    }
452
453    let country_code = country_code.as_bytes();
454
455    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
456}
457
458#[derive(Serialize, Debug, clickhouse::Row)]
459pub struct EditorEventRow {
460    pub installation_id: String,
461    pub operation: String,
462    pub app_version: String,
463    pub file_extension: String,
464    pub os_name: String,
465    pub os_version: String,
466    pub release_channel: String,
467    pub signed_in: bool,
468    pub vim_mode: bool,
469    #[serde(serialize_with = "serialize_country_code")]
470    pub country_code: String,
471    pub region_code: String,
472    pub city: String,
473    pub time: i64,
474    pub copilot_enabled: bool,
475    pub copilot_enabled_for_language: bool,
476    pub historical_event: bool,
477    pub architecture: String,
478    pub is_staff: Option<bool>,
479    pub session_id: Option<String>,
480    pub major: Option<i32>,
481    pub minor: Option<i32>,
482    pub patch: Option<i32>,
483}
484
485impl EditorEventRow {
486    fn from_event(
487        event: EditorEvent,
488        wrapper: &EventWrapper,
489        body: &EventRequestBody,
490        first_event_at: chrono::DateTime<chrono::Utc>,
491        country_code: Option<String>,
492    ) -> Self {
493        let semver = body.semver();
494        let time =
495            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
496
497        Self {
498            app_version: body.app_version.clone(),
499            major: semver.map(|s| s.major as i32),
500            minor: semver.map(|s| s.minor as i32),
501            patch: semver.map(|s| s.patch as i32),
502            release_channel: body.release_channel.clone().unwrap_or_default(),
503            os_name: body.os_name.clone(),
504            os_version: body.os_version.clone().unwrap_or_default(),
505            architecture: body.architecture.clone(),
506            installation_id: body.installation_id.clone().unwrap_or_default(),
507            session_id: body.session_id.clone(),
508            is_staff: body.is_staff,
509            time: time.timestamp_millis(),
510            operation: event.operation,
511            file_extension: event.file_extension.unwrap_or_default(),
512            signed_in: wrapper.signed_in,
513            vim_mode: event.vim_mode,
514            copilot_enabled: event.copilot_enabled,
515            copilot_enabled_for_language: event.copilot_enabled_for_language,
516            country_code: country_code.unwrap_or("XX".to_string()),
517            region_code: "".to_string(),
518            city: "".to_string(),
519            historical_event: false,
520        }
521    }
522}
523
524#[derive(Serialize, Debug, clickhouse::Row)]
525pub struct CopilotEventRow {
526    pub installation_id: String,
527    pub suggestion_id: String,
528    pub suggestion_accepted: bool,
529    pub app_version: String,
530    pub file_extension: String,
531    pub os_name: String,
532    pub os_version: String,
533    pub release_channel: String,
534    pub signed_in: bool,
535    #[serde(serialize_with = "serialize_country_code")]
536    pub country_code: String,
537    pub region_code: String,
538    pub city: String,
539    pub time: i64,
540    pub is_staff: Option<bool>,
541    pub session_id: Option<String>,
542    pub major: Option<i32>,
543    pub minor: Option<i32>,
544    pub patch: Option<i32>,
545}
546
547impl CopilotEventRow {
548    fn from_event(
549        event: CopilotEvent,
550        wrapper: &EventWrapper,
551        body: &EventRequestBody,
552        first_event_at: chrono::DateTime<chrono::Utc>,
553        country_code: Option<String>,
554    ) -> Self {
555        let semver = body.semver();
556        let time =
557            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
558
559        Self {
560            app_version: body.app_version.clone(),
561            major: semver.map(|s| s.major as i32),
562            minor: semver.map(|s| s.minor as i32),
563            patch: semver.map(|s| s.patch as i32),
564            release_channel: body.release_channel.clone().unwrap_or_default(),
565            os_name: body.os_name.clone(),
566            os_version: body.os_version.clone().unwrap_or_default(),
567            installation_id: body.installation_id.clone().unwrap_or_default(),
568            session_id: body.session_id.clone(),
569            is_staff: body.is_staff,
570            time: time.timestamp_millis(),
571            file_extension: event.file_extension.unwrap_or_default(),
572            signed_in: wrapper.signed_in,
573            country_code: country_code.unwrap_or("XX".to_string()),
574            region_code: "".to_string(),
575            city: "".to_string(),
576            suggestion_id: event.suggestion_id.unwrap_or_default(),
577            suggestion_accepted: event.suggestion_accepted,
578        }
579    }
580}
581
582#[derive(Serialize, Debug, clickhouse::Row)]
583pub struct CallEventRow {
584    // AppInfoBase
585    app_version: String,
586    major: Option<i32>,
587    minor: Option<i32>,
588    patch: Option<i32>,
589    release_channel: String,
590
591    // ClientEventBase
592    installation_id: String,
593    session_id: Option<String>,
594    is_staff: Option<bool>,
595    time: i64,
596
597    // CallEventRow
598    operation: String,
599    room_id: Option<u64>,
600    channel_id: Option<u64>,
601}
602
603impl CallEventRow {
604    fn from_event(
605        event: CallEvent,
606        wrapper: &EventWrapper,
607        body: &EventRequestBody,
608        first_event_at: chrono::DateTime<chrono::Utc>,
609    ) -> Self {
610        let semver = body.semver();
611        let time =
612            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
613
614        Self {
615            app_version: body.app_version.clone(),
616            major: semver.map(|s| s.major as i32),
617            minor: semver.map(|s| s.minor as i32),
618            patch: semver.map(|s| s.patch as i32),
619            release_channel: body.release_channel.clone().unwrap_or_default(),
620            installation_id: body.installation_id.clone().unwrap_or_default(),
621            session_id: body.session_id.clone(),
622            is_staff: body.is_staff,
623            time: time.timestamp_millis(),
624            operation: event.operation,
625            room_id: event.room_id,
626            channel_id: event.channel_id,
627        }
628    }
629}
630
631#[derive(Serialize, Debug, clickhouse::Row)]
632pub struct AssistantEventRow {
633    // AppInfoBase
634    app_version: String,
635    major: Option<i32>,
636    minor: Option<i32>,
637    patch: Option<i32>,
638    release_channel: String,
639
640    // ClientEventBase
641    installation_id: Option<String>,
642    session_id: Option<String>,
643    is_staff: Option<bool>,
644    time: i64,
645
646    // AssistantEventRow
647    conversation_id: String,
648    kind: String,
649    model: String,
650}
651
652impl AssistantEventRow {
653    fn from_event(
654        event: AssistantEvent,
655        wrapper: &EventWrapper,
656        body: &EventRequestBody,
657        first_event_at: chrono::DateTime<chrono::Utc>,
658    ) -> Self {
659        let semver = body.semver();
660        let time =
661            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
662
663        Self {
664            app_version: body.app_version.clone(),
665            major: semver.map(|s| s.major as i32),
666            minor: semver.map(|s| s.minor as i32),
667            patch: semver.map(|s| s.patch as i32),
668            release_channel: body.release_channel.clone().unwrap_or_default(),
669            installation_id: body.installation_id.clone(),
670            session_id: body.session_id.clone(),
671            is_staff: body.is_staff,
672            time: time.timestamp_millis(),
673            conversation_id: event.conversation_id.unwrap_or_default(),
674            kind: event.kind.to_string(),
675            model: event.model,
676        }
677    }
678}
679
680#[derive(Debug, clickhouse::Row, Serialize)]
681pub struct CpuEventRow {
682    pub installation_id: Option<String>,
683    pub is_staff: Option<bool>,
684    pub usage_as_percentage: f32,
685    pub core_count: u32,
686    pub app_version: String,
687    pub release_channel: String,
688    pub time: i64,
689    pub session_id: Option<String>,
690    // pub normalized_cpu_usage: f64, MATERIALIZED
691    pub major: Option<i32>,
692    pub minor: Option<i32>,
693    pub patch: Option<i32>,
694}
695
696impl CpuEventRow {
697    fn from_event(
698        event: CpuEvent,
699        wrapper: &EventWrapper,
700        body: &EventRequestBody,
701        first_event_at: chrono::DateTime<chrono::Utc>,
702    ) -> Self {
703        let semver = body.semver();
704        let time =
705            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
706
707        Self {
708            app_version: body.app_version.clone(),
709            major: semver.map(|s| s.major as i32),
710            minor: semver.map(|s| s.minor as i32),
711            patch: semver.map(|s| s.patch as i32),
712            release_channel: body.release_channel.clone().unwrap_or_default(),
713            installation_id: body.installation_id.clone(),
714            session_id: body.session_id.clone(),
715            is_staff: body.is_staff,
716            time: time.timestamp_millis(),
717            usage_as_percentage: event.usage_as_percentage,
718            core_count: event.core_count,
719        }
720    }
721}
722
723#[derive(Serialize, Debug, clickhouse::Row)]
724pub struct MemoryEventRow {
725    // AppInfoBase
726    app_version: String,
727    major: Option<i32>,
728    minor: Option<i32>,
729    patch: Option<i32>,
730    release_channel: String,
731
732    // ClientEventBase
733    installation_id: Option<String>,
734    session_id: Option<String>,
735    is_staff: Option<bool>,
736    time: i64,
737
738    // MemoryEventRow
739    memory_in_bytes: u64,
740    virtual_memory_in_bytes: u64,
741}
742
743impl MemoryEventRow {
744    fn from_event(
745        event: MemoryEvent,
746        wrapper: &EventWrapper,
747        body: &EventRequestBody,
748        first_event_at: chrono::DateTime<chrono::Utc>,
749    ) -> Self {
750        let semver = body.semver();
751        let time =
752            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
753
754        Self {
755            app_version: body.app_version.clone(),
756            major: semver.map(|s| s.major as i32),
757            minor: semver.map(|s| s.minor as i32),
758            patch: semver.map(|s| s.patch as i32),
759            release_channel: body.release_channel.clone().unwrap_or_default(),
760            installation_id: body.installation_id.clone(),
761            session_id: body.session_id.clone(),
762            is_staff: body.is_staff,
763            time: time.timestamp_millis(),
764            memory_in_bytes: event.memory_in_bytes,
765            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
766        }
767    }
768}
769
770#[derive(Serialize, Debug, clickhouse::Row)]
771pub struct AppEventRow {
772    // AppInfoBase
773    app_version: String,
774    major: Option<i32>,
775    minor: Option<i32>,
776    patch: Option<i32>,
777    release_channel: String,
778
779    // ClientEventBase
780    installation_id: Option<String>,
781    session_id: Option<String>,
782    is_staff: Option<bool>,
783    time: i64,
784
785    // AppEventRow
786    operation: String,
787}
788
789impl AppEventRow {
790    fn from_event(
791        event: AppEvent,
792        wrapper: &EventWrapper,
793        body: &EventRequestBody,
794        first_event_at: chrono::DateTime<chrono::Utc>,
795    ) -> Self {
796        let semver = body.semver();
797        let time =
798            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
799
800        Self {
801            app_version: body.app_version.clone(),
802            major: semver.map(|s| s.major as i32),
803            minor: semver.map(|s| s.minor as i32),
804            patch: semver.map(|s| s.patch as i32),
805            release_channel: body.release_channel.clone().unwrap_or_default(),
806            installation_id: body.installation_id.clone(),
807            session_id: body.session_id.clone(),
808            is_staff: body.is_staff,
809            time: time.timestamp_millis(),
810            operation: event.operation,
811        }
812    }
813}
814
815#[derive(Serialize, Debug, clickhouse::Row)]
816pub struct SettingEventRow {
817    // AppInfoBase
818    app_version: String,
819    major: Option<i32>,
820    minor: Option<i32>,
821    patch: Option<i32>,
822    release_channel: String,
823
824    // ClientEventBase
825    installation_id: Option<String>,
826    session_id: Option<String>,
827    is_staff: Option<bool>,
828    time: i64,
829    // SettingEventRow
830    setting: String,
831    value: String,
832}
833
834impl SettingEventRow {
835    fn from_event(
836        event: SettingEvent,
837        wrapper: &EventWrapper,
838        body: &EventRequestBody,
839        first_event_at: chrono::DateTime<chrono::Utc>,
840    ) -> Self {
841        let semver = body.semver();
842        let time =
843            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
844
845        Self {
846            app_version: body.app_version.clone(),
847            major: semver.map(|s| s.major as i32),
848            minor: semver.map(|s| s.minor as i32),
849            patch: semver.map(|s| s.patch as i32),
850            release_channel: body.release_channel.clone().unwrap_or_default(),
851            installation_id: body.installation_id.clone(),
852            session_id: body.session_id.clone(),
853            is_staff: body.is_staff,
854            time: time.timestamp_millis(),
855            setting: event.setting,
856            value: event.value,
857        }
858    }
859}
860
861#[derive(Serialize, Debug, clickhouse::Row)]
862pub struct EditEventRow {
863    // AppInfoBase
864    app_version: String,
865    major: Option<i32>,
866    minor: Option<i32>,
867    patch: Option<i32>,
868    release_channel: String,
869
870    // ClientEventBase
871    installation_id: Option<String>,
872    // Note: This column name has a typo in the ClickHouse table.
873    #[serde(rename = "sesssion_id")]
874    session_id: Option<String>,
875    is_staff: Option<bool>,
876    time: i64,
877
878    // EditEventRow
879    period_start: i64,
880    period_end: i64,
881    environment: String,
882}
883
884impl EditEventRow {
885    fn from_event(
886        event: EditEvent,
887        wrapper: &EventWrapper,
888        body: &EventRequestBody,
889        first_event_at: chrono::DateTime<chrono::Utc>,
890    ) -> Self {
891        let semver = body.semver();
892        let time =
893            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
894
895        let period_start = time - chrono::Duration::milliseconds(event.duration);
896        let period_end = time;
897
898        Self {
899            app_version: body.app_version.clone(),
900            major: semver.map(|s| s.major as i32),
901            minor: semver.map(|s| s.minor as i32),
902            patch: semver.map(|s| s.patch as i32),
903            release_channel: body.release_channel.clone().unwrap_or_default(),
904            installation_id: body.installation_id.clone(),
905            session_id: body.session_id.clone(),
906            is_staff: body.is_staff,
907            time: time.timestamp_millis(),
908            period_start: period_start.timestamp_millis(),
909            period_end: period_end.timestamp_millis(),
910            environment: event.environment,
911        }
912    }
913}
914
915#[derive(Serialize, Debug, clickhouse::Row)]
916pub struct ActionEventRow {
917    // AppInfoBase
918    app_version: String,
919    major: Option<i32>,
920    minor: Option<i32>,
921    patch: Option<i32>,
922    release_channel: String,
923
924    // ClientEventBase
925    installation_id: Option<String>,
926    // Note: This column name has a typo in the ClickHouse table.
927    #[serde(rename = "sesssion_id")]
928    session_id: Option<String>,
929    is_staff: Option<bool>,
930    time: i64,
931    // ActionEventRow
932    source: String,
933    action: String,
934}
935
936impl ActionEventRow {
937    fn from_event(
938        event: ActionEvent,
939        wrapper: &EventWrapper,
940        body: &EventRequestBody,
941        first_event_at: chrono::DateTime<chrono::Utc>,
942    ) -> Self {
943        let semver = body.semver();
944        let time =
945            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
946
947        Self {
948            app_version: body.app_version.clone(),
949            major: semver.map(|s| s.major as i32),
950            minor: semver.map(|s| s.minor as i32),
951            patch: semver.map(|s| s.patch as i32),
952            release_channel: body.release_channel.clone().unwrap_or_default(),
953            installation_id: body.installation_id.clone(),
954            session_id: body.session_id.clone(),
955            is_staff: body.is_staff,
956            time: time.timestamp_millis(),
957            source: event.source,
958            action: event.action,
959        }
960    }
961}