events.rs

  1use std::sync::{Arc, OnceLock};
  2
  3use anyhow::{anyhow, Context};
  4use aws_sdk_s3::primitives::ByteStream;
  5use axum::{
  6    body::Bytes, headers::Header, http::HeaderName, routing::post, Extension, Router, TypedHeader,
  7};
  8use hyper::{HeaderMap, StatusCode};
  9use serde::{Serialize, Serializer};
 10use sha2::{Digest, Sha256};
 11use telemetry_events::{
 12    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
 13    EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
 14};
 15use util::SemanticVersion;
 16
 17use crate::{api::slack, AppState, Error, Result};
 18
 19use super::ips_file::IpsFile;
 20
 21pub fn router() -> Router {
 22    Router::new()
 23        .route("/telemetry/events", post(post_events))
 24        .route("/telemetry/crashes", post(post_crash))
 25}
 26
 27pub struct ZedChecksumHeader(Vec<u8>);
 28
 29impl Header for ZedChecksumHeader {
 30    fn name() -> &'static HeaderName {
 31        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
 32        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
 33    }
 34
 35    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 36    where
 37        Self: Sized,
 38        I: Iterator<Item = &'i axum::http::HeaderValue>,
 39    {
 40        let checksum = values
 41            .next()
 42            .ok_or_else(axum::headers::Error::invalid)?
 43            .to_str()
 44            .map_err(|_| axum::headers::Error::invalid())?;
 45
 46        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
 47        Ok(Self(bytes))
 48    }
 49
 50    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 51        unimplemented!()
 52    }
 53}
 54
 55pub struct CloudflareIpCountryHeader(String);
 56
 57impl Header for CloudflareIpCountryHeader {
 58    fn name() -> &'static HeaderName {
 59        static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
 60        CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
 61    }
 62
 63    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 64    where
 65        Self: Sized,
 66        I: Iterator<Item = &'i axum::http::HeaderValue>,
 67    {
 68        let country_code = values
 69            .next()
 70            .ok_or_else(axum::headers::Error::invalid)?
 71            .to_str()
 72            .map_err(|_| axum::headers::Error::invalid())?;
 73
 74        Ok(Self(country_code.to_string()))
 75    }
 76
 77    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 78        unimplemented!()
 79    }
 80}
 81
 82pub async fn post_crash(
 83    Extension(app): Extension<Arc<AppState>>,
 84    body: Bytes,
 85    headers: HeaderMap,
 86) -> Result<()> {
 87    static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
 88
 89    let report = IpsFile::parse(&body)?;
 90    let version_threshold = SemanticVersion::new(0, 123, 0);
 91
 92    let bundle_id = &report.header.bundle_id;
 93    let app_version = &report.app_version();
 94
 95    if bundle_id == "dev.zed.Zed-Dev" {
 96        log::error!("Crash uploads from {} are ignored.", bundle_id);
 97        return Ok(());
 98    }
 99
100    if app_version.is_none() || app_version.unwrap() < version_threshold {
101        log::error!(
102            "Crash uploads from {} are ignored.",
103            report.header.app_version
104        );
105        return Ok(());
106    }
107    let app_version = app_version.unwrap();
108
109    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
110        let response = blob_store_client
111            .head_object()
112            .bucket(CRASH_REPORTS_BUCKET)
113            .key(report.header.incident_id.clone() + ".ips")
114            .send()
115            .await;
116
117        if response.is_ok() {
118            log::info!("We've already uploaded this crash");
119            return Ok(());
120        }
121
122        blob_store_client
123            .put_object()
124            .bucket(CRASH_REPORTS_BUCKET)
125            .key(report.header.incident_id.clone() + ".ips")
126            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
127            .body(ByteStream::from(body.to_vec()))
128            .send()
129            .await
130            .map_err(|e| log::error!("Failed to upload crash: {}", e))
131            .ok();
132    }
133
134    let recent_panic_on: Option<i64> = headers
135        .get("x-zed-panicked-on")
136        .and_then(|h| h.to_str().ok())
137        .and_then(|s| s.parse().ok());
138    let mut recent_panic = None;
139
140    if let Some(recent_panic_on) = recent_panic_on {
141        let crashed_at = match report.timestamp() {
142            Ok(t) => Some(t),
143            Err(e) => {
144                log::error!("Can't parse {}: {}", report.header.timestamp, e);
145                None
146            }
147        };
148        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
149            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
150        }
151    }
152
153    let description = report.description(recent_panic);
154    let summary = report.backtrace_summary();
155
156    tracing::error!(
157        service = "client",
158        version = %report.header.app_version,
159        os_version = %report.header.os_version,
160        bundle_id = %report.header.bundle_id,
161        incident_id = %report.header.incident_id,
162        description = %description,
163        backtrace = %summary,
164        "crash report");
165
166    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
167        let payload = slack::WebhookBody::new(|w| {
168            w.add_section(|s| s.text(slack::Text::markdown(description)))
169                .add_section(|s| {
170                    s.add_field(slack::Text::markdown(format!(
171                        "*Version:*\n{} ({})",
172                        bundle_id, app_version
173                    )))
174                    .add_field({
175                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
176                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
177                            hostname.strip_prefix("http://").unwrap_or_default()
178                        });
179
180                        slack::Text::markdown(format!(
181                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
182                            CRASH_REPORTS_BUCKET,
183                            hostname,
184                            report.header.incident_id,
185                            report
186                                .header
187                                .incident_id
188                                .chars()
189                                .take(8)
190                                .collect::<String>(),
191                        ))
192                    })
193                })
194                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
195        });
196        let payload_json = serde_json::to_string(&payload).map_err(|err| {
197            log::error!("Failed to serialize payload to JSON: {err}");
198            Error::Internal(anyhow!(err))
199        })?;
200
201        reqwest::Client::new()
202            .post(slack_panics_webhook)
203            .header("Content-Type", "application/json")
204            .body(payload_json)
205            .send()
206            .await
207            .map_err(|err| {
208                log::error!("Failed to send payload to Slack: {err}");
209                Error::Internal(anyhow!(err))
210            })?;
211    }
212
213    Ok(())
214}
215
216pub async fn post_events(
217    Extension(app): Extension<Arc<AppState>>,
218    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
219    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
220    body: Bytes,
221) -> Result<()> {
222    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
223        Err(Error::Http(
224            StatusCode::NOT_IMPLEMENTED,
225            "not supported".into(),
226        ))?
227    };
228
229    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
230        return Err(Error::Http(
231            StatusCode::INTERNAL_SERVER_ERROR,
232            "events not enabled".into(),
233        ))?;
234    };
235
236    let mut summer = Sha256::new();
237    summer.update(checksum_seed);
238    summer.update(&body);
239    summer.update(checksum_seed);
240
241    if &checksum != &summer.finalize()[..] {
242        return Err(Error::Http(
243            StatusCode::BAD_REQUEST,
244            "invalid checksum".into(),
245        ))?;
246    }
247
248    let request_body: telemetry_events::EventRequestBody =
249        serde_json::from_slice(&body).map_err(|err| {
250            log::error!("can't parse event json: {err}");
251            Error::Internal(anyhow!(err))
252        })?;
253
254    let mut to_upload = ToUpload::default();
255    let Some(last_event) = request_body.events.last() else {
256        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
257    };
258    let country_code = country_code_header.map(|h| h.0 .0);
259
260    let first_event_at = chrono::Utc::now()
261        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
262
263    for wrapper in &request_body.events {
264        match &wrapper.event {
265            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
266                event.clone(),
267                &wrapper,
268                &request_body,
269                first_event_at,
270                country_code.clone(),
271            )),
272            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
273                event.clone(),
274                &wrapper,
275                &request_body,
276                first_event_at,
277                country_code.clone(),
278            )),
279            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
280                event.clone(),
281                &wrapper,
282                &request_body,
283                first_event_at,
284            )),
285            Event::Assistant(event) => {
286                to_upload
287                    .assistant_events
288                    .push(AssistantEventRow::from_event(
289                        event.clone(),
290                        &wrapper,
291                        &request_body,
292                        first_event_at,
293                    ))
294            }
295            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
296                event.clone(),
297                &wrapper,
298                &request_body,
299                first_event_at,
300            )),
301            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
302                event.clone(),
303                &wrapper,
304                &request_body,
305                first_event_at,
306            )),
307            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
308                event.clone(),
309                &wrapper,
310                &request_body,
311                first_event_at,
312            )),
313            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
314                event.clone(),
315                &wrapper,
316                &request_body,
317                first_event_at,
318            )),
319            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
320                event.clone(),
321                &wrapper,
322                &request_body,
323                first_event_at,
324            )),
325            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
326                event.clone(),
327                &wrapper,
328                &request_body,
329                first_event_at,
330            )),
331        }
332    }
333
334    to_upload
335        .upload(&clickhouse_client)
336        .await
337        .map_err(|err| Error::Internal(anyhow!(err)))?;
338
339    Ok(())
340}
341
342#[derive(Default)]
343struct ToUpload {
344    editor_events: Vec<EditorEventRow>,
345    copilot_events: Vec<CopilotEventRow>,
346    assistant_events: Vec<AssistantEventRow>,
347    call_events: Vec<CallEventRow>,
348    cpu_events: Vec<CpuEventRow>,
349    memory_events: Vec<MemoryEventRow>,
350    app_events: Vec<AppEventRow>,
351    setting_events: Vec<SettingEventRow>,
352    edit_events: Vec<EditEventRow>,
353    action_events: Vec<ActionEventRow>,
354}
355
356impl ToUpload {
357    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
358        Self::upload_to_table("editor_events", &self.editor_events, clickhouse_client)
359            .await
360            .with_context(|| format!("failed to upload to table 'editor_events'"))?;
361        Self::upload_to_table("copilot_events", &self.copilot_events, clickhouse_client)
362            .await
363            .with_context(|| format!("failed to upload to table 'copilot_events'"))?;
364        Self::upload_to_table(
365            "assistant_events",
366            &self.assistant_events,
367            clickhouse_client,
368        )
369        .await
370        .with_context(|| format!("failed to upload to table 'assistant_events'"))?;
371        Self::upload_to_table("call_events", &self.call_events, clickhouse_client)
372            .await
373            .with_context(|| format!("failed to upload to table 'call_events'"))?;
374        Self::upload_to_table("cpu_events", &self.cpu_events, clickhouse_client)
375            .await
376            .with_context(|| format!("failed to upload to table 'cpu_events'"))?;
377        Self::upload_to_table("memory_events", &self.memory_events, clickhouse_client)
378            .await
379            .with_context(|| format!("failed to upload to table 'memory_events'"))?;
380        Self::upload_to_table("app_events", &self.app_events, clickhouse_client)
381            .await
382            .with_context(|| format!("failed to upload to table 'app_events'"))?;
383        Self::upload_to_table("setting_events", &self.setting_events, clickhouse_client)
384            .await
385            .with_context(|| format!("failed to upload to table 'setting_events'"))?;
386        Self::upload_to_table("edit_events", &self.edit_events, clickhouse_client)
387            .await
388            .with_context(|| format!("failed to upload to table 'edit_events'"))?;
389        Self::upload_to_table("action_events", &self.action_events, clickhouse_client)
390            .await
391            .with_context(|| format!("failed to upload to table 'action_events'"))?;
392        Ok(())
393    }
394
395    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
396        table: &str,
397        rows: &[T],
398        clickhouse_client: &clickhouse::Client,
399    ) -> anyhow::Result<()> {
400        if !rows.is_empty() {
401            let mut insert = clickhouse_client.insert(table)?;
402
403            for event in rows {
404                insert.write(event).await?;
405            }
406
407            insert.end().await?;
408        }
409
410        Ok(())
411    }
412}
413
414pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
415where
416    S: Serializer,
417{
418    if country_code.len() != 2 {
419        use serde::ser::Error;
420        return Err(S::Error::custom(
421            "country_code must be exactly 2 characters",
422        ));
423    }
424
425    let country_code = country_code.as_bytes();
426
427    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
428}
429
430#[derive(Serialize, Debug, clickhouse::Row)]
431pub struct EditorEventRow {
432    pub installation_id: String,
433    pub operation: String,
434    pub app_version: String,
435    pub file_extension: String,
436    pub os_name: String,
437    pub os_version: String,
438    pub release_channel: String,
439    pub signed_in: bool,
440    pub vim_mode: bool,
441    #[serde(serialize_with = "serialize_country_code")]
442    pub country_code: String,
443    pub region_code: String,
444    pub city: String,
445    pub time: i64,
446    pub copilot_enabled: bool,
447    pub copilot_enabled_for_language: bool,
448    pub historical_event: bool,
449    pub architecture: String,
450    pub is_staff: Option<bool>,
451    pub session_id: Option<String>,
452    pub major: Option<i32>,
453    pub minor: Option<i32>,
454    pub patch: Option<i32>,
455}
456
457impl EditorEventRow {
458    fn from_event(
459        event: EditorEvent,
460        wrapper: &EventWrapper,
461        body: &EventRequestBody,
462        first_event_at: chrono::DateTime<chrono::Utc>,
463        country_code: Option<String>,
464    ) -> Self {
465        let semver = body.semver();
466        let time =
467            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
468
469        Self {
470            app_version: body.app_version.clone(),
471            major: semver.map(|s| s.major as i32),
472            minor: semver.map(|s| s.minor as i32),
473            patch: semver.map(|s| s.patch as i32),
474            release_channel: body.release_channel.clone().unwrap_or_default(),
475            os_name: body.os_name.clone(),
476            os_version: body.os_version.clone().unwrap_or_default(),
477            architecture: body.architecture.clone(),
478            installation_id: body.installation_id.clone().unwrap_or_default(),
479            session_id: body.session_id.clone(),
480            is_staff: body.is_staff,
481            time: time.timestamp_millis(),
482            operation: event.operation,
483            file_extension: event.file_extension.unwrap_or_default(),
484            signed_in: wrapper.signed_in,
485            vim_mode: event.vim_mode,
486            copilot_enabled: event.copilot_enabled,
487            copilot_enabled_for_language: event.copilot_enabled_for_language,
488            country_code: country_code.unwrap_or("XX".to_string()),
489            region_code: "".to_string(),
490            city: "".to_string(),
491            historical_event: false,
492        }
493    }
494}
495
496#[derive(Serialize, Debug, clickhouse::Row)]
497pub struct CopilotEventRow {
498    pub installation_id: String,
499    pub suggestion_id: String,
500    pub suggestion_accepted: bool,
501    pub app_version: String,
502    pub file_extension: String,
503    pub os_name: String,
504    pub os_version: String,
505    pub release_channel: String,
506    pub signed_in: bool,
507    #[serde(serialize_with = "serialize_country_code")]
508    pub country_code: String,
509    pub region_code: String,
510    pub city: String,
511    pub time: i64,
512    pub is_staff: Option<bool>,
513    pub session_id: Option<String>,
514    pub major: Option<i32>,
515    pub minor: Option<i32>,
516    pub patch: Option<i32>,
517}
518
519impl CopilotEventRow {
520    fn from_event(
521        event: CopilotEvent,
522        wrapper: &EventWrapper,
523        body: &EventRequestBody,
524        first_event_at: chrono::DateTime<chrono::Utc>,
525        country_code: Option<String>,
526    ) -> Self {
527        let semver = body.semver();
528        let time =
529            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
530
531        Self {
532            app_version: body.app_version.clone(),
533            major: semver.map(|s| s.major as i32),
534            minor: semver.map(|s| s.minor as i32),
535            patch: semver.map(|s| s.patch as i32),
536            release_channel: body.release_channel.clone().unwrap_or_default(),
537            os_name: body.os_name.clone(),
538            os_version: body.os_version.clone().unwrap_or_default(),
539            installation_id: body.installation_id.clone().unwrap_or_default(),
540            session_id: body.session_id.clone(),
541            is_staff: body.is_staff,
542            time: time.timestamp_millis(),
543            file_extension: event.file_extension.unwrap_or_default(),
544            signed_in: wrapper.signed_in,
545            country_code: country_code.unwrap_or("XX".to_string()),
546            region_code: "".to_string(),
547            city: "".to_string(),
548            suggestion_id: event.suggestion_id.unwrap_or_default(),
549            suggestion_accepted: event.suggestion_accepted,
550        }
551    }
552}
553
554#[derive(Serialize, Debug, clickhouse::Row)]
555pub struct CallEventRow {
556    // AppInfoBase
557    app_version: String,
558    major: Option<i32>,
559    minor: Option<i32>,
560    patch: Option<i32>,
561    release_channel: String,
562
563    // ClientEventBase
564    installation_id: String,
565    session_id: Option<String>,
566    is_staff: Option<bool>,
567    time: i64,
568
569    // CallEventRow
570    operation: String,
571    room_id: Option<u64>,
572    channel_id: Option<u64>,
573}
574
575impl CallEventRow {
576    fn from_event(
577        event: CallEvent,
578        wrapper: &EventWrapper,
579        body: &EventRequestBody,
580        first_event_at: chrono::DateTime<chrono::Utc>,
581    ) -> Self {
582        let semver = body.semver();
583        let time =
584            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
585
586        Self {
587            app_version: body.app_version.clone(),
588            major: semver.map(|s| s.major as i32),
589            minor: semver.map(|s| s.minor as i32),
590            patch: semver.map(|s| s.patch as i32),
591            release_channel: body.release_channel.clone().unwrap_or_default(),
592            installation_id: body.installation_id.clone().unwrap_or_default(),
593            session_id: body.session_id.clone(),
594            is_staff: body.is_staff,
595            time: time.timestamp_millis(),
596            operation: event.operation,
597            room_id: event.room_id,
598            channel_id: event.channel_id,
599        }
600    }
601}
602
603#[derive(Serialize, Debug, clickhouse::Row)]
604pub struct AssistantEventRow {
605    // AppInfoBase
606    app_version: String,
607    major: Option<i32>,
608    minor: Option<i32>,
609    patch: Option<i32>,
610    release_channel: String,
611
612    // ClientEventBase
613    installation_id: Option<String>,
614    session_id: Option<String>,
615    is_staff: Option<bool>,
616    time: i64,
617
618    // AssistantEventRow
619    conversation_id: String,
620    kind: String,
621    model: String,
622}
623
624impl AssistantEventRow {
625    fn from_event(
626        event: AssistantEvent,
627        wrapper: &EventWrapper,
628        body: &EventRequestBody,
629        first_event_at: chrono::DateTime<chrono::Utc>,
630    ) -> Self {
631        let semver = body.semver();
632        let time =
633            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
634
635        Self {
636            app_version: body.app_version.clone(),
637            major: semver.map(|s| s.major as i32),
638            minor: semver.map(|s| s.minor as i32),
639            patch: semver.map(|s| s.patch as i32),
640            release_channel: body.release_channel.clone().unwrap_or_default(),
641            installation_id: body.installation_id.clone(),
642            session_id: body.session_id.clone(),
643            is_staff: body.is_staff,
644            time: time.timestamp_millis(),
645            conversation_id: event.conversation_id.unwrap_or_default(),
646            kind: event.kind.to_string(),
647            model: event.model,
648        }
649    }
650}
651
652#[derive(Debug, clickhouse::Row, Serialize)]
653pub struct CpuEventRow {
654    pub installation_id: Option<String>,
655    pub is_staff: Option<bool>,
656    pub usage_as_percentage: f32,
657    pub core_count: u32,
658    pub app_version: String,
659    pub release_channel: String,
660    pub time: i64,
661    pub session_id: Option<String>,
662    // pub normalized_cpu_usage: f64, MATERIALIZED
663    pub major: Option<i32>,
664    pub minor: Option<i32>,
665    pub patch: Option<i32>,
666}
667
668impl CpuEventRow {
669    fn from_event(
670        event: CpuEvent,
671        wrapper: &EventWrapper,
672        body: &EventRequestBody,
673        first_event_at: chrono::DateTime<chrono::Utc>,
674    ) -> Self {
675        let semver = body.semver();
676        let time =
677            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
678
679        Self {
680            app_version: body.app_version.clone(),
681            major: semver.map(|s| s.major as i32),
682            minor: semver.map(|s| s.minor as i32),
683            patch: semver.map(|s| s.patch as i32),
684            release_channel: body.release_channel.clone().unwrap_or_default(),
685            installation_id: body.installation_id.clone(),
686            session_id: body.session_id.clone(),
687            is_staff: body.is_staff,
688            time: time.timestamp_millis(),
689            usage_as_percentage: event.usage_as_percentage,
690            core_count: event.core_count,
691        }
692    }
693}
694
695#[derive(Serialize, Debug, clickhouse::Row)]
696pub struct MemoryEventRow {
697    // AppInfoBase
698    app_version: String,
699    major: Option<i32>,
700    minor: Option<i32>,
701    patch: Option<i32>,
702    release_channel: String,
703
704    // ClientEventBase
705    installation_id: Option<String>,
706    session_id: Option<String>,
707    is_staff: Option<bool>,
708    time: i64,
709
710    // MemoryEventRow
711    memory_in_bytes: u64,
712    virtual_memory_in_bytes: u64,
713}
714
715impl MemoryEventRow {
716    fn from_event(
717        event: MemoryEvent,
718        wrapper: &EventWrapper,
719        body: &EventRequestBody,
720        first_event_at: chrono::DateTime<chrono::Utc>,
721    ) -> Self {
722        let semver = body.semver();
723        let time =
724            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
725
726        Self {
727            app_version: body.app_version.clone(),
728            major: semver.map(|s| s.major as i32),
729            minor: semver.map(|s| s.minor as i32),
730            patch: semver.map(|s| s.patch as i32),
731            release_channel: body.release_channel.clone().unwrap_or_default(),
732            installation_id: body.installation_id.clone(),
733            session_id: body.session_id.clone(),
734            is_staff: body.is_staff,
735            time: time.timestamp_millis(),
736            memory_in_bytes: event.memory_in_bytes,
737            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
738        }
739    }
740}
741
742#[derive(Serialize, Debug, clickhouse::Row)]
743pub struct AppEventRow {
744    // AppInfoBase
745    app_version: String,
746    major: Option<i32>,
747    minor: Option<i32>,
748    patch: Option<i32>,
749    release_channel: String,
750
751    // ClientEventBase
752    installation_id: Option<String>,
753    session_id: Option<String>,
754    is_staff: Option<bool>,
755    time: i64,
756
757    // AppEventRow
758    operation: String,
759}
760
761impl AppEventRow {
762    fn from_event(
763        event: AppEvent,
764        wrapper: &EventWrapper,
765        body: &EventRequestBody,
766        first_event_at: chrono::DateTime<chrono::Utc>,
767    ) -> Self {
768        let semver = body.semver();
769        let time =
770            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
771
772        Self {
773            app_version: body.app_version.clone(),
774            major: semver.map(|s| s.major as i32),
775            minor: semver.map(|s| s.minor as i32),
776            patch: semver.map(|s| s.patch as i32),
777            release_channel: body.release_channel.clone().unwrap_or_default(),
778            installation_id: body.installation_id.clone(),
779            session_id: body.session_id.clone(),
780            is_staff: body.is_staff,
781            time: time.timestamp_millis(),
782            operation: event.operation,
783        }
784    }
785}
786
787#[derive(Serialize, Debug, clickhouse::Row)]
788pub struct SettingEventRow {
789    // AppInfoBase
790    app_version: String,
791    major: Option<i32>,
792    minor: Option<i32>,
793    patch: Option<i32>,
794    release_channel: String,
795
796    // ClientEventBase
797    installation_id: Option<String>,
798    session_id: Option<String>,
799    is_staff: Option<bool>,
800    time: i64,
801    // SettingEventRow
802    setting: String,
803    value: String,
804}
805
806impl SettingEventRow {
807    fn from_event(
808        event: SettingEvent,
809        wrapper: &EventWrapper,
810        body: &EventRequestBody,
811        first_event_at: chrono::DateTime<chrono::Utc>,
812    ) -> Self {
813        let semver = body.semver();
814        let time =
815            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
816
817        Self {
818            app_version: body.app_version.clone(),
819            major: semver.map(|s| s.major as i32),
820            minor: semver.map(|s| s.minor as i32),
821            patch: semver.map(|s| s.patch as i32),
822            release_channel: body.release_channel.clone().unwrap_or_default(),
823            installation_id: body.installation_id.clone(),
824            session_id: body.session_id.clone(),
825            is_staff: body.is_staff,
826            time: time.timestamp_millis(),
827            setting: event.setting,
828            value: event.value,
829        }
830    }
831}
832
833#[derive(Serialize, Debug, clickhouse::Row)]
834pub struct EditEventRow {
835    // AppInfoBase
836    app_version: String,
837    major: Option<i32>,
838    minor: Option<i32>,
839    patch: Option<i32>,
840    release_channel: String,
841
842    // ClientEventBase
843    installation_id: Option<String>,
844    // Note: This column name has a typo in the ClickHouse table.
845    #[serde(rename = "sesssion_id")]
846    session_id: Option<String>,
847    is_staff: Option<bool>,
848    time: i64,
849
850    // EditEventRow
851    period_start: i64,
852    period_end: i64,
853    environment: String,
854}
855
856impl EditEventRow {
857    fn from_event(
858        event: EditEvent,
859        wrapper: &EventWrapper,
860        body: &EventRequestBody,
861        first_event_at: chrono::DateTime<chrono::Utc>,
862    ) -> Self {
863        let semver = body.semver();
864        let time =
865            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
866
867        let period_start = time - chrono::Duration::milliseconds(event.duration);
868        let period_end = time;
869
870        Self {
871            app_version: body.app_version.clone(),
872            major: semver.map(|s| s.major as i32),
873            minor: semver.map(|s| s.minor as i32),
874            patch: semver.map(|s| s.patch as i32),
875            release_channel: body.release_channel.clone().unwrap_or_default(),
876            installation_id: body.installation_id.clone(),
877            session_id: body.session_id.clone(),
878            is_staff: body.is_staff,
879            time: time.timestamp_millis(),
880            period_start: period_start.timestamp_millis(),
881            period_end: period_end.timestamp_millis(),
882            environment: event.environment,
883        }
884    }
885}
886
887#[derive(Serialize, Debug, clickhouse::Row)]
888pub struct ActionEventRow {
889    // AppInfoBase
890    app_version: String,
891    major: Option<i32>,
892    minor: Option<i32>,
893    patch: Option<i32>,
894    release_channel: String,
895
896    // ClientEventBase
897    installation_id: Option<String>,
898    // Note: This column name has a typo in the ClickHouse table.
899    #[serde(rename = "sesssion_id")]
900    session_id: Option<String>,
901    is_staff: Option<bool>,
902    time: i64,
903    // ActionEventRow
904    source: String,
905    action: String,
906}
907
908impl ActionEventRow {
909    fn from_event(
910        event: ActionEvent,
911        wrapper: &EventWrapper,
912        body: &EventRequestBody,
913        first_event_at: chrono::DateTime<chrono::Utc>,
914    ) -> Self {
915        let semver = body.semver();
916        let time =
917            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
918
919        Self {
920            app_version: body.app_version.clone(),
921            major: semver.map(|s| s.major as i32),
922            minor: semver.map(|s| s.minor as i32),
923            patch: semver.map(|s| s.patch as i32),
924            release_channel: body.release_channel.clone().unwrap_or_default(),
925            installation_id: body.installation_id.clone(),
926            session_id: body.session_id.clone(),
927            is_staff: body.is_staff,
928            time: time.timestamp_millis(),
929            source: event.source,
930            action: event.action,
931        }
932    }
933}