events.rs

  1use std::sync::{Arc, OnceLock};
  2
  3use anyhow::{anyhow, Context};
  4use aws_sdk_s3::primitives::ByteStream;
  5use axum::{
  6    body::Bytes,
  7    headers::Header,
  8    http::{HeaderMap, HeaderName, StatusCode},
  9    routing::post,
 10    Extension, Router, TypedHeader,
 11};
 12use serde::{Serialize, Serializer};
 13use sha2::{Digest, Sha256};
 14use telemetry_events::{
 15    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
 16    EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
 17};
 18use util::SemanticVersion;
 19
 20use crate::{api::slack, AppState, Error, Result};
 21
 22use super::ips_file::IpsFile;
 23
 24pub fn router() -> Router {
 25    Router::new()
 26        .route("/telemetry/events", post(post_events))
 27        .route("/telemetry/crashes", post(post_crash))
 28}
 29
 30pub struct ZedChecksumHeader(Vec<u8>);
 31
 32impl Header for ZedChecksumHeader {
 33    fn name() -> &'static HeaderName {
 34        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
 35        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
 36    }
 37
 38    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 39    where
 40        Self: Sized,
 41        I: Iterator<Item = &'i axum::http::HeaderValue>,
 42    {
 43        let checksum = values
 44            .next()
 45            .ok_or_else(axum::headers::Error::invalid)?
 46            .to_str()
 47            .map_err(|_| axum::headers::Error::invalid())?;
 48
 49        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
 50        Ok(Self(bytes))
 51    }
 52
 53    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 54        unimplemented!()
 55    }
 56}
 57
 58pub struct CloudflareIpCountryHeader(String);
 59
 60impl Header for CloudflareIpCountryHeader {
 61    fn name() -> &'static HeaderName {
 62        static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
 63        CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
 64    }
 65
 66    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 67    where
 68        Self: Sized,
 69        I: Iterator<Item = &'i axum::http::HeaderValue>,
 70    {
 71        let country_code = values
 72            .next()
 73            .ok_or_else(axum::headers::Error::invalid)?
 74            .to_str()
 75            .map_err(|_| axum::headers::Error::invalid())?;
 76
 77        Ok(Self(country_code.to_string()))
 78    }
 79
 80    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 81        unimplemented!()
 82    }
 83}
 84
 85pub async fn post_crash(
 86    Extension(app): Extension<Arc<AppState>>,
 87    headers: HeaderMap,
 88    body: Bytes,
 89) -> Result<()> {
 90    static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
 91
 92    let report = IpsFile::parse(&body)?;
 93    let version_threshold = SemanticVersion::new(0, 123, 0);
 94
 95    let bundle_id = &report.header.bundle_id;
 96    let app_version = &report.app_version();
 97
 98    if bundle_id == "dev.zed.Zed-Dev" {
 99        log::error!("Crash uploads from {} are ignored.", bundle_id);
100        return Ok(());
101    }
102
103    if app_version.is_none() || app_version.unwrap() < version_threshold {
104        log::error!(
105            "Crash uploads from {} are ignored.",
106            report.header.app_version
107        );
108        return Ok(());
109    }
110    let app_version = app_version.unwrap();
111
112    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
113        let response = blob_store_client
114            .head_object()
115            .bucket(CRASH_REPORTS_BUCKET)
116            .key(report.header.incident_id.clone() + ".ips")
117            .send()
118            .await;
119
120        if response.is_ok() {
121            log::info!("We've already uploaded this crash");
122            return Ok(());
123        }
124
125        blob_store_client
126            .put_object()
127            .bucket(CRASH_REPORTS_BUCKET)
128            .key(report.header.incident_id.clone() + ".ips")
129            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
130            .body(ByteStream::from(body.to_vec()))
131            .send()
132            .await
133            .map_err(|e| log::error!("Failed to upload crash: {}", e))
134            .ok();
135    }
136
137    let recent_panic_on: Option<i64> = headers
138        .get("x-zed-panicked-on")
139        .and_then(|h| h.to_str().ok())
140        .and_then(|s| s.parse().ok());
141    let mut recent_panic = None;
142
143    if let Some(recent_panic_on) = recent_panic_on {
144        let crashed_at = match report.timestamp() {
145            Ok(t) => Some(t),
146            Err(e) => {
147                log::error!("Can't parse {}: {}", report.header.timestamp, e);
148                None
149            }
150        };
151        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
152            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
153        }
154    }
155
156    let description = report.description(recent_panic);
157    let summary = report.backtrace_summary();
158
159    tracing::error!(
160        service = "client",
161        version = %report.header.app_version,
162        os_version = %report.header.os_version,
163        bundle_id = %report.header.bundle_id,
164        incident_id = %report.header.incident_id,
165        description = %description,
166        backtrace = %summary,
167        "crash report");
168
169    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
170        let payload = slack::WebhookBody::new(|w| {
171            w.add_section(|s| s.text(slack::Text::markdown(description)))
172                .add_section(|s| {
173                    s.add_field(slack::Text::markdown(format!(
174                        "*Version:*\n{} ({})",
175                        bundle_id, app_version
176                    )))
177                    .add_field({
178                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
179                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
180                            hostname.strip_prefix("http://").unwrap_or_default()
181                        });
182
183                        slack::Text::markdown(format!(
184                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
185                            CRASH_REPORTS_BUCKET,
186                            hostname,
187                            report.header.incident_id,
188                            report
189                                .header
190                                .incident_id
191                                .chars()
192                                .take(8)
193                                .collect::<String>(),
194                        ))
195                    })
196                })
197                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
198        });
199        let payload_json = serde_json::to_string(&payload).map_err(|err| {
200            log::error!("Failed to serialize payload to JSON: {err}");
201            Error::Internal(anyhow!(err))
202        })?;
203
204        reqwest::Client::new()
205            .post(slack_panics_webhook)
206            .header("Content-Type", "application/json")
207            .body(payload_json)
208            .send()
209            .await
210            .map_err(|err| {
211                log::error!("Failed to send payload to Slack: {err}");
212                Error::Internal(anyhow!(err))
213            })?;
214    }
215
216    Ok(())
217}
218
219pub async fn post_events(
220    Extension(app): Extension<Arc<AppState>>,
221    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
222    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
223    body: Bytes,
224) -> Result<()> {
225    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
226        Err(Error::Http(
227            StatusCode::NOT_IMPLEMENTED,
228            "not supported".into(),
229        ))?
230    };
231
232    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
233        return Err(Error::Http(
234            StatusCode::INTERNAL_SERVER_ERROR,
235            "events not enabled".into(),
236        ))?;
237    };
238
239    let mut summer = Sha256::new();
240    summer.update(checksum_seed);
241    summer.update(&body);
242    summer.update(checksum_seed);
243
244    if &checksum != &summer.finalize()[..] {
245        return Err(Error::Http(
246            StatusCode::BAD_REQUEST,
247            "invalid checksum".into(),
248        ))?;
249    }
250
251    let request_body: telemetry_events::EventRequestBody =
252        serde_json::from_slice(&body).map_err(|err| {
253            log::error!("can't parse event json: {err}");
254            Error::Internal(anyhow!(err))
255        })?;
256
257    let mut to_upload = ToUpload::default();
258    let Some(last_event) = request_body.events.last() else {
259        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
260    };
261    let country_code = country_code_header.map(|h| h.0 .0);
262
263    let first_event_at = chrono::Utc::now()
264        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
265
266    for wrapper in &request_body.events {
267        match &wrapper.event {
268            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
269                event.clone(),
270                &wrapper,
271                &request_body,
272                first_event_at,
273                country_code.clone(),
274            )),
275            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
276                event.clone(),
277                &wrapper,
278                &request_body,
279                first_event_at,
280                country_code.clone(),
281            )),
282            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
283                event.clone(),
284                &wrapper,
285                &request_body,
286                first_event_at,
287            )),
288            Event::Assistant(event) => {
289                to_upload
290                    .assistant_events
291                    .push(AssistantEventRow::from_event(
292                        event.clone(),
293                        &wrapper,
294                        &request_body,
295                        first_event_at,
296                    ))
297            }
298            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
299                event.clone(),
300                &wrapper,
301                &request_body,
302                first_event_at,
303            )),
304            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
305                event.clone(),
306                &wrapper,
307                &request_body,
308                first_event_at,
309            )),
310            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
311                event.clone(),
312                &wrapper,
313                &request_body,
314                first_event_at,
315            )),
316            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
317                event.clone(),
318                &wrapper,
319                &request_body,
320                first_event_at,
321            )),
322            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
323                event.clone(),
324                &wrapper,
325                &request_body,
326                first_event_at,
327            )),
328            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
329                event.clone(),
330                &wrapper,
331                &request_body,
332                first_event_at,
333            )),
334        }
335    }
336
337    to_upload
338        .upload(&clickhouse_client)
339        .await
340        .map_err(|err| Error::Internal(anyhow!(err)))?;
341
342    Ok(())
343}
344
345#[derive(Default)]
346struct ToUpload {
347    editor_events: Vec<EditorEventRow>,
348    copilot_events: Vec<CopilotEventRow>,
349    assistant_events: Vec<AssistantEventRow>,
350    call_events: Vec<CallEventRow>,
351    cpu_events: Vec<CpuEventRow>,
352    memory_events: Vec<MemoryEventRow>,
353    app_events: Vec<AppEventRow>,
354    setting_events: Vec<SettingEventRow>,
355    edit_events: Vec<EditEventRow>,
356    action_events: Vec<ActionEventRow>,
357}
358
359impl ToUpload {
360    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
361        const EDITOR_EVENTS_TABLE: &str = "editor_events";
362        Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client)
363            .await
364            .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?;
365
366        const COPILOT_EVENTS_TABLE: &str = "copilot_events";
367        Self::upload_to_table(
368            COPILOT_EVENTS_TABLE,
369            &self.copilot_events,
370            clickhouse_client,
371        )
372        .await
373        .with_context(|| format!("failed to upload to table '{COPILOT_EVENTS_TABLE}'"))?;
374
375        const ASSISTANT_EVENTS_TABLE: &str = "assistant_events";
376        Self::upload_to_table(
377            ASSISTANT_EVENTS_TABLE,
378            &self.assistant_events,
379            clickhouse_client,
380        )
381        .await
382        .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?;
383
384        const CALL_EVENTS_TABLE: &str = "call_events";
385        Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client)
386            .await
387            .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?;
388
389        const CPU_EVENTS_TABLE: &str = "cpu_events";
390        Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client)
391            .await
392            .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?;
393
394        const MEMORY_EVENTS_TABLE: &str = "memory_events";
395        Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client)
396            .await
397            .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?;
398
399        const APP_EVENTS_TABLE: &str = "app_events";
400        Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client)
401            .await
402            .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?;
403
404        const SETTING_EVENTS_TABLE: &str = "setting_events";
405        Self::upload_to_table(
406            SETTING_EVENTS_TABLE,
407            &self.setting_events,
408            clickhouse_client,
409        )
410        .await
411        .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?;
412
413        const EDIT_EVENTS_TABLE: &str = "edit_events";
414        Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client)
415            .await
416            .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?;
417
418        const ACTION_EVENTS_TABLE: &str = "action_events";
419        Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client)
420            .await
421            .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
422
423        Ok(())
424    }
425
426    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
427        table: &str,
428        rows: &[T],
429        clickhouse_client: &clickhouse::Client,
430    ) -> anyhow::Result<()> {
431        if !rows.is_empty() {
432            let mut insert = clickhouse_client.insert(table)?;
433
434            for event in rows {
435                insert.write(event).await?;
436            }
437
438            insert.end().await?;
439        }
440
441        Ok(())
442    }
443}
444
445pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
446where
447    S: Serializer,
448{
449    if country_code.len() != 2 {
450        use serde::ser::Error;
451        return Err(S::Error::custom(
452            "country_code must be exactly 2 characters",
453        ));
454    }
455
456    let country_code = country_code.as_bytes();
457
458    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
459}
460
461#[derive(Serialize, Debug, clickhouse::Row)]
462pub struct EditorEventRow {
463    pub installation_id: String,
464    pub operation: String,
465    pub app_version: String,
466    pub file_extension: String,
467    pub os_name: String,
468    pub os_version: String,
469    pub release_channel: String,
470    pub signed_in: bool,
471    pub vim_mode: bool,
472    #[serde(serialize_with = "serialize_country_code")]
473    pub country_code: String,
474    pub region_code: String,
475    pub city: String,
476    pub time: i64,
477    pub copilot_enabled: bool,
478    pub copilot_enabled_for_language: bool,
479    pub historical_event: bool,
480    pub architecture: String,
481    pub is_staff: Option<bool>,
482    pub session_id: Option<String>,
483    pub major: Option<i32>,
484    pub minor: Option<i32>,
485    pub patch: Option<i32>,
486}
487
488impl EditorEventRow {
489    fn from_event(
490        event: EditorEvent,
491        wrapper: &EventWrapper,
492        body: &EventRequestBody,
493        first_event_at: chrono::DateTime<chrono::Utc>,
494        country_code: Option<String>,
495    ) -> Self {
496        let semver = body.semver();
497        let time =
498            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
499
500        Self {
501            app_version: body.app_version.clone(),
502            major: semver.map(|s| s.major as i32),
503            minor: semver.map(|s| s.minor as i32),
504            patch: semver.map(|s| s.patch as i32),
505            release_channel: body.release_channel.clone().unwrap_or_default(),
506            os_name: body.os_name.clone(),
507            os_version: body.os_version.clone().unwrap_or_default(),
508            architecture: body.architecture.clone(),
509            installation_id: body.installation_id.clone().unwrap_or_default(),
510            session_id: body.session_id.clone(),
511            is_staff: body.is_staff,
512            time: time.timestamp_millis(),
513            operation: event.operation,
514            file_extension: event.file_extension.unwrap_or_default(),
515            signed_in: wrapper.signed_in,
516            vim_mode: event.vim_mode,
517            copilot_enabled: event.copilot_enabled,
518            copilot_enabled_for_language: event.copilot_enabled_for_language,
519            country_code: country_code.unwrap_or("XX".to_string()),
520            region_code: "".to_string(),
521            city: "".to_string(),
522            historical_event: false,
523        }
524    }
525}
526
527#[derive(Serialize, Debug, clickhouse::Row)]
528pub struct CopilotEventRow {
529    pub installation_id: String,
530    pub suggestion_id: String,
531    pub suggestion_accepted: bool,
532    pub app_version: String,
533    pub file_extension: String,
534    pub os_name: String,
535    pub os_version: String,
536    pub release_channel: String,
537    pub signed_in: bool,
538    #[serde(serialize_with = "serialize_country_code")]
539    pub country_code: String,
540    pub region_code: String,
541    pub city: String,
542    pub time: i64,
543    pub is_staff: Option<bool>,
544    pub session_id: Option<String>,
545    pub major: Option<i32>,
546    pub minor: Option<i32>,
547    pub patch: Option<i32>,
548}
549
550impl CopilotEventRow {
551    fn from_event(
552        event: CopilotEvent,
553        wrapper: &EventWrapper,
554        body: &EventRequestBody,
555        first_event_at: chrono::DateTime<chrono::Utc>,
556        country_code: Option<String>,
557    ) -> Self {
558        let semver = body.semver();
559        let time =
560            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
561
562        Self {
563            app_version: body.app_version.clone(),
564            major: semver.map(|s| s.major as i32),
565            minor: semver.map(|s| s.minor as i32),
566            patch: semver.map(|s| s.patch as i32),
567            release_channel: body.release_channel.clone().unwrap_or_default(),
568            os_name: body.os_name.clone(),
569            os_version: body.os_version.clone().unwrap_or_default(),
570            installation_id: body.installation_id.clone().unwrap_or_default(),
571            session_id: body.session_id.clone(),
572            is_staff: body.is_staff,
573            time: time.timestamp_millis(),
574            file_extension: event.file_extension.unwrap_or_default(),
575            signed_in: wrapper.signed_in,
576            country_code: country_code.unwrap_or("XX".to_string()),
577            region_code: "".to_string(),
578            city: "".to_string(),
579            suggestion_id: event.suggestion_id.unwrap_or_default(),
580            suggestion_accepted: event.suggestion_accepted,
581        }
582    }
583}
584
585#[derive(Serialize, Debug, clickhouse::Row)]
586pub struct CallEventRow {
587    // AppInfoBase
588    app_version: String,
589    major: Option<i32>,
590    minor: Option<i32>,
591    patch: Option<i32>,
592    release_channel: String,
593
594    // ClientEventBase
595    installation_id: String,
596    session_id: Option<String>,
597    is_staff: Option<bool>,
598    time: i64,
599
600    // CallEventRow
601    operation: String,
602    room_id: Option<u64>,
603    channel_id: Option<u64>,
604}
605
606impl CallEventRow {
607    fn from_event(
608        event: CallEvent,
609        wrapper: &EventWrapper,
610        body: &EventRequestBody,
611        first_event_at: chrono::DateTime<chrono::Utc>,
612    ) -> Self {
613        let semver = body.semver();
614        let time =
615            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
616
617        Self {
618            app_version: body.app_version.clone(),
619            major: semver.map(|s| s.major as i32),
620            minor: semver.map(|s| s.minor as i32),
621            patch: semver.map(|s| s.patch as i32),
622            release_channel: body.release_channel.clone().unwrap_or_default(),
623            installation_id: body.installation_id.clone().unwrap_or_default(),
624            session_id: body.session_id.clone(),
625            is_staff: body.is_staff,
626            time: time.timestamp_millis(),
627            operation: event.operation,
628            room_id: event.room_id,
629            channel_id: event.channel_id,
630        }
631    }
632}
633
634#[derive(Serialize, Debug, clickhouse::Row)]
635pub struct AssistantEventRow {
636    // AppInfoBase
637    app_version: String,
638    major: Option<i32>,
639    minor: Option<i32>,
640    patch: Option<i32>,
641    release_channel: String,
642
643    // ClientEventBase
644    installation_id: Option<String>,
645    session_id: Option<String>,
646    is_staff: Option<bool>,
647    time: i64,
648
649    // AssistantEventRow
650    conversation_id: String,
651    kind: String,
652    model: String,
653}
654
655impl AssistantEventRow {
656    fn from_event(
657        event: AssistantEvent,
658        wrapper: &EventWrapper,
659        body: &EventRequestBody,
660        first_event_at: chrono::DateTime<chrono::Utc>,
661    ) -> Self {
662        let semver = body.semver();
663        let time =
664            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
665
666        Self {
667            app_version: body.app_version.clone(),
668            major: semver.map(|s| s.major as i32),
669            minor: semver.map(|s| s.minor as i32),
670            patch: semver.map(|s| s.patch as i32),
671            release_channel: body.release_channel.clone().unwrap_or_default(),
672            installation_id: body.installation_id.clone(),
673            session_id: body.session_id.clone(),
674            is_staff: body.is_staff,
675            time: time.timestamp_millis(),
676            conversation_id: event.conversation_id.unwrap_or_default(),
677            kind: event.kind.to_string(),
678            model: event.model,
679        }
680    }
681}
682
683#[derive(Debug, clickhouse::Row, Serialize)]
684pub struct CpuEventRow {
685    pub installation_id: Option<String>,
686    pub is_staff: Option<bool>,
687    pub usage_as_percentage: f32,
688    pub core_count: u32,
689    pub app_version: String,
690    pub release_channel: String,
691    pub time: i64,
692    pub session_id: Option<String>,
693    // pub normalized_cpu_usage: f64, MATERIALIZED
694    pub major: Option<i32>,
695    pub minor: Option<i32>,
696    pub patch: Option<i32>,
697}
698
699impl CpuEventRow {
700    fn from_event(
701        event: CpuEvent,
702        wrapper: &EventWrapper,
703        body: &EventRequestBody,
704        first_event_at: chrono::DateTime<chrono::Utc>,
705    ) -> Self {
706        let semver = body.semver();
707        let time =
708            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
709
710        Self {
711            app_version: body.app_version.clone(),
712            major: semver.map(|s| s.major as i32),
713            minor: semver.map(|s| s.minor as i32),
714            patch: semver.map(|s| s.patch as i32),
715            release_channel: body.release_channel.clone().unwrap_or_default(),
716            installation_id: body.installation_id.clone(),
717            session_id: body.session_id.clone(),
718            is_staff: body.is_staff,
719            time: time.timestamp_millis(),
720            usage_as_percentage: event.usage_as_percentage,
721            core_count: event.core_count,
722        }
723    }
724}
725
726#[derive(Serialize, Debug, clickhouse::Row)]
727pub struct MemoryEventRow {
728    // AppInfoBase
729    app_version: String,
730    major: Option<i32>,
731    minor: Option<i32>,
732    patch: Option<i32>,
733    release_channel: String,
734
735    // ClientEventBase
736    installation_id: Option<String>,
737    session_id: Option<String>,
738    is_staff: Option<bool>,
739    time: i64,
740
741    // MemoryEventRow
742    memory_in_bytes: u64,
743    virtual_memory_in_bytes: u64,
744}
745
746impl MemoryEventRow {
747    fn from_event(
748        event: MemoryEvent,
749        wrapper: &EventWrapper,
750        body: &EventRequestBody,
751        first_event_at: chrono::DateTime<chrono::Utc>,
752    ) -> Self {
753        let semver = body.semver();
754        let time =
755            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
756
757        Self {
758            app_version: body.app_version.clone(),
759            major: semver.map(|s| s.major as i32),
760            minor: semver.map(|s| s.minor as i32),
761            patch: semver.map(|s| s.patch as i32),
762            release_channel: body.release_channel.clone().unwrap_or_default(),
763            installation_id: body.installation_id.clone(),
764            session_id: body.session_id.clone(),
765            is_staff: body.is_staff,
766            time: time.timestamp_millis(),
767            memory_in_bytes: event.memory_in_bytes,
768            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
769        }
770    }
771}
772
773#[derive(Serialize, Debug, clickhouse::Row)]
774pub struct AppEventRow {
775    // AppInfoBase
776    app_version: String,
777    major: Option<i32>,
778    minor: Option<i32>,
779    patch: Option<i32>,
780    release_channel: String,
781
782    // ClientEventBase
783    installation_id: Option<String>,
784    session_id: Option<String>,
785    is_staff: Option<bool>,
786    time: i64,
787
788    // AppEventRow
789    operation: String,
790}
791
792impl AppEventRow {
793    fn from_event(
794        event: AppEvent,
795        wrapper: &EventWrapper,
796        body: &EventRequestBody,
797        first_event_at: chrono::DateTime<chrono::Utc>,
798    ) -> Self {
799        let semver = body.semver();
800        let time =
801            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
802
803        Self {
804            app_version: body.app_version.clone(),
805            major: semver.map(|s| s.major as i32),
806            minor: semver.map(|s| s.minor as i32),
807            patch: semver.map(|s| s.patch as i32),
808            release_channel: body.release_channel.clone().unwrap_or_default(),
809            installation_id: body.installation_id.clone(),
810            session_id: body.session_id.clone(),
811            is_staff: body.is_staff,
812            time: time.timestamp_millis(),
813            operation: event.operation,
814        }
815    }
816}
817
818#[derive(Serialize, Debug, clickhouse::Row)]
819pub struct SettingEventRow {
820    // AppInfoBase
821    app_version: String,
822    major: Option<i32>,
823    minor: Option<i32>,
824    patch: Option<i32>,
825    release_channel: String,
826
827    // ClientEventBase
828    installation_id: Option<String>,
829    session_id: Option<String>,
830    is_staff: Option<bool>,
831    time: i64,
832    // SettingEventRow
833    setting: String,
834    value: String,
835}
836
837impl SettingEventRow {
838    fn from_event(
839        event: SettingEvent,
840        wrapper: &EventWrapper,
841        body: &EventRequestBody,
842        first_event_at: chrono::DateTime<chrono::Utc>,
843    ) -> Self {
844        let semver = body.semver();
845        let time =
846            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
847
848        Self {
849            app_version: body.app_version.clone(),
850            major: semver.map(|s| s.major as i32),
851            minor: semver.map(|s| s.minor as i32),
852            patch: semver.map(|s| s.patch as i32),
853            release_channel: body.release_channel.clone().unwrap_or_default(),
854            installation_id: body.installation_id.clone(),
855            session_id: body.session_id.clone(),
856            is_staff: body.is_staff,
857            time: time.timestamp_millis(),
858            setting: event.setting,
859            value: event.value,
860        }
861    }
862}
863
864#[derive(Serialize, Debug, clickhouse::Row)]
865pub struct EditEventRow {
866    // AppInfoBase
867    app_version: String,
868    major: Option<i32>,
869    minor: Option<i32>,
870    patch: Option<i32>,
871    release_channel: String,
872
873    // ClientEventBase
874    installation_id: Option<String>,
875    // Note: This column name has a typo in the ClickHouse table.
876    #[serde(rename = "sesssion_id")]
877    session_id: Option<String>,
878    is_staff: Option<bool>,
879    time: i64,
880
881    // EditEventRow
882    period_start: i64,
883    period_end: i64,
884    environment: String,
885}
886
887impl EditEventRow {
888    fn from_event(
889        event: EditEvent,
890        wrapper: &EventWrapper,
891        body: &EventRequestBody,
892        first_event_at: chrono::DateTime<chrono::Utc>,
893    ) -> Self {
894        let semver = body.semver();
895        let time =
896            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
897
898        let period_start = time - chrono::Duration::milliseconds(event.duration);
899        let period_end = time;
900
901        Self {
902            app_version: body.app_version.clone(),
903            major: semver.map(|s| s.major as i32),
904            minor: semver.map(|s| s.minor as i32),
905            patch: semver.map(|s| s.patch as i32),
906            release_channel: body.release_channel.clone().unwrap_or_default(),
907            installation_id: body.installation_id.clone(),
908            session_id: body.session_id.clone(),
909            is_staff: body.is_staff,
910            time: time.timestamp_millis(),
911            period_start: period_start.timestamp_millis(),
912            period_end: period_end.timestamp_millis(),
913            environment: event.environment,
914        }
915    }
916}
917
918#[derive(Serialize, Debug, clickhouse::Row)]
919pub struct ActionEventRow {
920    // AppInfoBase
921    app_version: String,
922    major: Option<i32>,
923    minor: Option<i32>,
924    patch: Option<i32>,
925    release_channel: String,
926
927    // ClientEventBase
928    installation_id: Option<String>,
929    // Note: This column name has a typo in the ClickHouse table.
930    #[serde(rename = "sesssion_id")]
931    session_id: Option<String>,
932    is_staff: Option<bool>,
933    time: i64,
934    // ActionEventRow
935    source: String,
936    action: String,
937}
938
939impl ActionEventRow {
940    fn from_event(
941        event: ActionEvent,
942        wrapper: &EventWrapper,
943        body: &EventRequestBody,
944        first_event_at: chrono::DateTime<chrono::Utc>,
945    ) -> Self {
946        let semver = body.semver();
947        let time =
948            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
949
950        Self {
951            app_version: body.app_version.clone(),
952            major: semver.map(|s| s.major as i32),
953            minor: semver.map(|s| s.minor as i32),
954            patch: semver.map(|s| s.patch as i32),
955            release_channel: body.release_channel.clone().unwrap_or_default(),
956            installation_id: body.installation_id.clone(),
957            session_id: body.session_id.clone(),
958            is_staff: body.is_staff,
959            time: time.timestamp_millis(),
960            source: event.source,
961            action: event.action,
962        }
963    }
964}