events.rs

  1use std::sync::Arc;
  2
  3use anyhow::{anyhow, Context};
  4use aws_sdk_s3::primitives::ByteStream;
  5use axum::{
  6    body::Bytes, headers::Header, http::HeaderName, routing::post, Extension, Router, TypedHeader,
  7};
  8use hyper::StatusCode;
  9use hyper::{HeaderMap, StatusCode};
 10use serde::{Serialize, Serializer};
 11use sha2::{Digest, Sha256};
 12use telemetry_events::{
 13    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
 14    EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
 15};
 16use util::SemanticVersion;
 17
 18use crate::{api::slack, AppState, Error, Result};
 19
 20use super::ips_file::IpsFile;
 21
 22pub fn router() -> Router {
 23    Router::new()
 24        .route("/telemetry/events", post(post_events))
 25        .route("/telemetry/crashes", post(post_crash))
 26}
 27
 28lazy_static! {
 29    static ref ZED_CHECKSUM_HEADER: HeaderName = HeaderName::from_static("x-zed-checksum");
 30    static ref CLOUDFLARE_IP_COUNTRY_HEADER: HeaderName = HeaderName::from_static("cf-ipcountry");
 31}
 32
 33pub struct ZedChecksumHeader(Vec<u8>);
 34
 35impl Header for ZedChecksumHeader {
 36    fn name() -> &'static HeaderName {
 37        &ZED_CHECKSUM_HEADER
 38    }
 39
 40    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 41    where
 42        Self: Sized,
 43        I: Iterator<Item = &'i axum::http::HeaderValue>,
 44    {
 45        let checksum = values
 46            .next()
 47            .ok_or_else(axum::headers::Error::invalid)?
 48            .to_str()
 49            .map_err(|_| axum::headers::Error::invalid())?;
 50
 51        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
 52        Ok(Self(bytes))
 53    }
 54
 55    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 56        unimplemented!()
 57    }
 58}
 59
 60pub struct CloudflareIpCountryHeader(String);
 61
 62impl Header for CloudflareIpCountryHeader {
 63    fn name() -> &'static HeaderName {
 64        &CLOUDFLARE_IP_COUNTRY_HEADER
 65    }
 66
 67    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 68    where
 69        Self: Sized,
 70        I: Iterator<Item = &'i axum::http::HeaderValue>,
 71    {
 72        let country_code = values
 73            .next()
 74            .ok_or_else(axum::headers::Error::invalid)?
 75            .to_str()
 76            .map_err(|_| axum::headers::Error::invalid())?;
 77
 78        Ok(Self(country_code.to_string()))
 79    }
 80
 81    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 82        unimplemented!()
 83    }
 84}
 85
 86pub async fn post_crash(
 87    Extension(app): Extension<Arc<AppState>>,
 88    body: Bytes,
 89    headers: HeaderMap,
 90) -> Result<()> {
 91    static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
 92
 93    let report = IpsFile::parse(&body)?;
 94    let version_threshold = SemanticVersion::new(0, 123, 0);
 95
 96    let bundle_id = &report.header.bundle_id;
 97    let app_version = &report.app_version();
 98
 99    if bundle_id == "dev.zed.Zed-Dev" {
100        log::error!("Crash uploads from {} are ignored.", bundle_id);
101        return Ok(());
102    }
103
104    if app_version.is_none() || app_version.unwrap() < version_threshold {
105        log::error!(
106            "Crash uploads from {} are ignored.",
107            report.header.app_version
108        );
109        return Ok(());
110    }
111    let app_version = app_version.unwrap();
112
113    if let Some(blob_store_client) = app.blob_store_client.as_ref() {
114        let response = blob_store_client
115            .head_object()
116            .bucket(CRASH_REPORTS_BUCKET)
117            .key(report.header.incident_id.clone() + ".ips")
118            .send()
119            .await;
120
121        if response.is_ok() {
122            log::info!("We've already uploaded this crash");
123            return Ok(());
124        }
125
126        blob_store_client
127            .put_object()
128            .bucket(CRASH_REPORTS_BUCKET)
129            .key(report.header.incident_id.clone() + ".ips")
130            .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
131            .body(ByteStream::from(body.to_vec()))
132            .send()
133            .await
134            .map_err(|e| log::error!("Failed to upload crash: {}", e))
135            .ok();
136    }
137
138    let recent_panic_on: Option<i64> = headers
139        .get("x-zed-panicked-on")
140        .and_then(|h| h.to_str().ok())
141        .and_then(|s| s.parse().ok());
142    let mut recent_panic = None;
143
144    if let Some(recent_panic_on) = recent_panic_on {
145        let crashed_at = match report.timestamp() {
146            Ok(t) => Some(t),
147            Err(e) => {
148                log::error!("Can't parse {}: {}", report.header.timestamp, e);
149                None
150            }
151        };
152        if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
153            recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
154        }
155    }
156
157    let description = report.description(recent_panic);
158    let summary = report.backtrace_summary();
159
160    tracing::error!(
161        service = "client",
162        version = %report.header.app_version,
163        os_version = %report.header.os_version,
164        bundle_id = %report.header.bundle_id,
165        incident_id = %report.header.incident_id,
166        description = %description,
167        backtrace = %summary,
168        "crash report");
169
170    if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
171        let payload = slack::WebhookBody::new(|w| {
172            w.add_section(|s| s.text(slack::Text::markdown(description)))
173                .add_section(|s| {
174                    s.add_field(slack::Text::markdown(format!(
175                        "*Version:*\n{} ({})",
176                        bundle_id, app_version
177                    )))
178                    .add_field({
179                        let hostname = app.config.blob_store_url.clone().unwrap_or_default();
180                        let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
181                            hostname.strip_prefix("http://").unwrap_or_default()
182                        });
183
184                        slack::Text::markdown(format!(
185                            "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
186                            CRASH_REPORTS_BUCKET,
187                            hostname,
188                            report.header.incident_id,
189                            report
190                                .header
191                                .incident_id
192                                .chars()
193                                .take(8)
194                                .collect::<String>(),
195                        ))
196                    })
197                })
198                .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
199        });
200        let payload_json = serde_json::to_string(&payload).map_err(|err| {
201            log::error!("Failed to serialize payload to JSON: {err}");
202            Error::Internal(anyhow!(err))
203        })?;
204
205        reqwest::Client::new()
206            .post(slack_panics_webhook)
207            .header("Content-Type", "application/json")
208            .body(payload_json)
209            .send()
210            .await
211            .map_err(|err| {
212                log::error!("Failed to send payload to Slack: {err}");
213                Error::Internal(anyhow!(err))
214            })?;
215    }
216
217    Ok(())
218}
219
220pub async fn post_events(
221    Extension(app): Extension<Arc<AppState>>,
222    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
223    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
224    body: Bytes,
225) -> Result<()> {
226    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
227        Err(Error::Http(
228            StatusCode::NOT_IMPLEMENTED,
229            "not supported".into(),
230        ))?
231    };
232
233    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
234        return Err(Error::Http(
235            StatusCode::INTERNAL_SERVER_ERROR,
236            "events not enabled".into(),
237        ))?;
238    };
239
240    let mut summer = Sha256::new();
241    summer.update(checksum_seed);
242    summer.update(&body);
243    summer.update(checksum_seed);
244
245    if &checksum != &summer.finalize()[..] {
246        return Err(Error::Http(
247            StatusCode::BAD_REQUEST,
248            "invalid checksum".into(),
249        ))?;
250    }
251
252    let request_body: telemetry_events::EventRequestBody =
253        serde_json::from_slice(&body).map_err(|err| {
254            log::error!("can't parse event json: {err}");
255            Error::Internal(anyhow!(err))
256        })?;
257
258    let mut to_upload = ToUpload::default();
259    let Some(last_event) = request_body.events.last() else {
260        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
261    };
262    let country_code = country_code_header.map(|h| h.0 .0);
263
264    let first_event_at = chrono::Utc::now()
265        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
266
267    for wrapper in &request_body.events {
268        match &wrapper.event {
269            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
270                event.clone(),
271                &wrapper,
272                &request_body,
273                first_event_at,
274                country_code.clone(),
275            )),
276            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
277                event.clone(),
278                &wrapper,
279                &request_body,
280                first_event_at,
281                country_code.clone(),
282            )),
283            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
284                event.clone(),
285                &wrapper,
286                &request_body,
287                first_event_at,
288            )),
289            Event::Assistant(event) => {
290                to_upload
291                    .assistant_events
292                    .push(AssistantEventRow::from_event(
293                        event.clone(),
294                        &wrapper,
295                        &request_body,
296                        first_event_at,
297                    ))
298            }
299            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
300                event.clone(),
301                &wrapper,
302                &request_body,
303                first_event_at,
304            )),
305            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
306                event.clone(),
307                &wrapper,
308                &request_body,
309                first_event_at,
310            )),
311            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
312                event.clone(),
313                &wrapper,
314                &request_body,
315                first_event_at,
316            )),
317            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
318                event.clone(),
319                &wrapper,
320                &request_body,
321                first_event_at,
322            )),
323            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
324                event.clone(),
325                &wrapper,
326                &request_body,
327                first_event_at,
328            )),
329            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
330                event.clone(),
331                &wrapper,
332                &request_body,
333                first_event_at,
334            )),
335        }
336    }
337
338    to_upload
339        .upload(&clickhouse_client)
340        .await
341        .map_err(|err| Error::Internal(anyhow!(err)))?;
342
343    Ok(())
344}
345
346#[derive(Default)]
347struct ToUpload {
348    editor_events: Vec<EditorEventRow>,
349    copilot_events: Vec<CopilotEventRow>,
350    assistant_events: Vec<AssistantEventRow>,
351    call_events: Vec<CallEventRow>,
352    cpu_events: Vec<CpuEventRow>,
353    memory_events: Vec<MemoryEventRow>,
354    app_events: Vec<AppEventRow>,
355    setting_events: Vec<SettingEventRow>,
356    edit_events: Vec<EditEventRow>,
357    action_events: Vec<ActionEventRow>,
358}
359
360impl ToUpload {
361    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
362        Self::upload_to_table("editor_events", &self.editor_events, clickhouse_client)
363            .await
364            .with_context(|| format!("failed to upload to table 'editor_events'"))?;
365        Self::upload_to_table("copilot_events", &self.copilot_events, clickhouse_client)
366            .await
367            .with_context(|| format!("failed to upload to table 'copilot_events'"))?;
368        Self::upload_to_table(
369            "assistant_events",
370            &self.assistant_events,
371            clickhouse_client,
372        )
373        .await
374        .with_context(|| format!("failed to upload to table 'assistant_events'"))?;
375        Self::upload_to_table("call_events", &self.call_events, clickhouse_client)
376            .await
377            .with_context(|| format!("failed to upload to table 'call_events'"))?;
378        Self::upload_to_table("cpu_events", &self.cpu_events, clickhouse_client)
379            .await
380            .with_context(|| format!("failed to upload to table 'cpu_events'"))?;
381        Self::upload_to_table("memory_events", &self.memory_events, clickhouse_client)
382            .await
383            .with_context(|| format!("failed to upload to table 'memory_events'"))?;
384        Self::upload_to_table("app_events", &self.app_events, clickhouse_client)
385            .await
386            .with_context(|| format!("failed to upload to table 'app_events'"))?;
387        Self::upload_to_table("setting_events", &self.setting_events, clickhouse_client)
388            .await
389            .with_context(|| format!("failed to upload to table 'setting_events'"))?;
390        Self::upload_to_table("edit_events", &self.edit_events, clickhouse_client)
391            .await
392            .with_context(|| format!("failed to upload to table 'edit_events'"))?;
393        Self::upload_to_table("action_events", &self.action_events, clickhouse_client)
394            .await
395            .with_context(|| format!("failed to upload to table 'action_events'"))?;
396        Ok(())
397    }
398
399    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
400        table: &str,
401        rows: &[T],
402        clickhouse_client: &clickhouse::Client,
403    ) -> anyhow::Result<()> {
404        if !rows.is_empty() {
405            let mut insert = clickhouse_client.insert(table)?;
406
407            for event in rows {
408                insert.write(event).await?;
409            }
410
411            insert.end().await?;
412        }
413
414        Ok(())
415    }
416}
417
418pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
419where
420    S: Serializer,
421{
422    if country_code.len() != 2 {
423        use serde::ser::Error;
424        return Err(S::Error::custom(
425            "country_code must be exactly 2 characters",
426        ));
427    }
428
429    let country_code = country_code.as_bytes();
430
431    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
432}
433
434#[derive(Serialize, Debug, clickhouse::Row)]
435pub struct EditorEventRow {
436    pub installation_id: String,
437    pub operation: String,
438    pub app_version: String,
439    pub file_extension: String,
440    pub os_name: String,
441    pub os_version: String,
442    pub release_channel: String,
443    pub signed_in: bool,
444    pub vim_mode: bool,
445    #[serde(serialize_with = "serialize_country_code")]
446    pub country_code: String,
447    pub region_code: String,
448    pub city: String,
449    pub time: i64,
450    pub copilot_enabled: bool,
451    pub copilot_enabled_for_language: bool,
452    pub historical_event: bool,
453    pub architecture: String,
454    pub is_staff: Option<bool>,
455    pub session_id: Option<String>,
456    pub major: Option<i32>,
457    pub minor: Option<i32>,
458    pub patch: Option<i32>,
459}
460
461impl EditorEventRow {
462    fn from_event(
463        event: EditorEvent,
464        wrapper: &EventWrapper,
465        body: &EventRequestBody,
466        first_event_at: chrono::DateTime<chrono::Utc>,
467        country_code: Option<String>,
468    ) -> Self {
469        let semver = body.semver();
470        let time =
471            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
472
473        Self {
474            app_version: body.app_version.clone(),
475            major: semver.map(|s| s.major as i32),
476            minor: semver.map(|s| s.minor as i32),
477            patch: semver.map(|s| s.patch as i32),
478            release_channel: body.release_channel.clone().unwrap_or_default(),
479            os_name: body.os_name.clone(),
480            os_version: body.os_version.clone().unwrap_or_default(),
481            architecture: body.architecture.clone(),
482            installation_id: body.installation_id.clone().unwrap_or_default(),
483            session_id: body.session_id.clone(),
484            is_staff: body.is_staff,
485            time: time.timestamp_millis(),
486            operation: event.operation,
487            file_extension: event.file_extension.unwrap_or_default(),
488            signed_in: wrapper.signed_in,
489            vim_mode: event.vim_mode,
490            copilot_enabled: event.copilot_enabled,
491            copilot_enabled_for_language: event.copilot_enabled_for_language,
492            country_code: country_code.unwrap_or("XX".to_string()),
493            region_code: "".to_string(),
494            city: "".to_string(),
495            historical_event: false,
496        }
497    }
498}
499
500#[derive(Serialize, Debug, clickhouse::Row)]
501pub struct CopilotEventRow {
502    pub installation_id: String,
503    pub suggestion_id: String,
504    pub suggestion_accepted: bool,
505    pub app_version: String,
506    pub file_extension: String,
507    pub os_name: String,
508    pub os_version: String,
509    pub release_channel: String,
510    pub signed_in: bool,
511    #[serde(serialize_with = "serialize_country_code")]
512    pub country_code: String,
513    pub region_code: String,
514    pub city: String,
515    pub time: i64,
516    pub is_staff: Option<bool>,
517    pub session_id: Option<String>,
518    pub major: Option<i32>,
519    pub minor: Option<i32>,
520    pub patch: Option<i32>,
521}
522
523impl CopilotEventRow {
524    fn from_event(
525        event: CopilotEvent,
526        wrapper: &EventWrapper,
527        body: &EventRequestBody,
528        first_event_at: chrono::DateTime<chrono::Utc>,
529        country_code: Option<String>,
530    ) -> Self {
531        let semver = body.semver();
532        let time =
533            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
534
535        Self {
536            app_version: body.app_version.clone(),
537            major: semver.map(|s| s.major as i32),
538            minor: semver.map(|s| s.minor as i32),
539            patch: semver.map(|s| s.patch as i32),
540            release_channel: body.release_channel.clone().unwrap_or_default(),
541            os_name: body.os_name.clone(),
542            os_version: body.os_version.clone().unwrap_or_default(),
543            installation_id: body.installation_id.clone().unwrap_or_default(),
544            session_id: body.session_id.clone(),
545            is_staff: body.is_staff,
546            time: time.timestamp_millis(),
547            file_extension: event.file_extension.unwrap_or_default(),
548            signed_in: wrapper.signed_in,
549            country_code: country_code.unwrap_or("XX".to_string()),
550            region_code: "".to_string(),
551            city: "".to_string(),
552            suggestion_id: event.suggestion_id.unwrap_or_default(),
553            suggestion_accepted: event.suggestion_accepted,
554        }
555    }
556}
557
558#[derive(Serialize, Debug, clickhouse::Row)]
559pub struct CallEventRow {
560    // AppInfoBase
561    app_version: String,
562    major: Option<i32>,
563    minor: Option<i32>,
564    patch: Option<i32>,
565    release_channel: String,
566
567    // ClientEventBase
568    installation_id: String,
569    session_id: Option<String>,
570    is_staff: Option<bool>,
571    time: i64,
572
573    // CallEventRow
574    operation: String,
575    room_id: Option<u64>,
576    channel_id: Option<u64>,
577}
578
579impl CallEventRow {
580    fn from_event(
581        event: CallEvent,
582        wrapper: &EventWrapper,
583        body: &EventRequestBody,
584        first_event_at: chrono::DateTime<chrono::Utc>,
585    ) -> Self {
586        let semver = body.semver();
587        let time =
588            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
589
590        Self {
591            app_version: body.app_version.clone(),
592            major: semver.map(|s| s.major as i32),
593            minor: semver.map(|s| s.minor as i32),
594            patch: semver.map(|s| s.patch as i32),
595            release_channel: body.release_channel.clone().unwrap_or_default(),
596            installation_id: body.installation_id.clone().unwrap_or_default(),
597            session_id: body.session_id.clone(),
598            is_staff: body.is_staff,
599            time: time.timestamp_millis(),
600            operation: event.operation,
601            room_id: event.room_id,
602            channel_id: event.channel_id,
603        }
604    }
605}
606
607#[derive(Serialize, Debug, clickhouse::Row)]
608pub struct AssistantEventRow {
609    // AppInfoBase
610    app_version: String,
611    major: Option<i32>,
612    minor: Option<i32>,
613    patch: Option<i32>,
614    release_channel: String,
615
616    // ClientEventBase
617    installation_id: Option<String>,
618    session_id: Option<String>,
619    is_staff: Option<bool>,
620    time: i64,
621
622    // AssistantEventRow
623    conversation_id: String,
624    kind: String,
625    model: String,
626}
627
628impl AssistantEventRow {
629    fn from_event(
630        event: AssistantEvent,
631        wrapper: &EventWrapper,
632        body: &EventRequestBody,
633        first_event_at: chrono::DateTime<chrono::Utc>,
634    ) -> Self {
635        let semver = body.semver();
636        let time =
637            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
638
639        Self {
640            app_version: body.app_version.clone(),
641            major: semver.map(|s| s.major as i32),
642            minor: semver.map(|s| s.minor as i32),
643            patch: semver.map(|s| s.patch as i32),
644            release_channel: body.release_channel.clone().unwrap_or_default(),
645            installation_id: body.installation_id.clone(),
646            session_id: body.session_id.clone(),
647            is_staff: body.is_staff,
648            time: time.timestamp_millis(),
649            conversation_id: event.conversation_id.unwrap_or_default(),
650            kind: event.kind.to_string(),
651            model: event.model,
652        }
653    }
654}
655
656#[derive(Debug, clickhouse::Row, Serialize)]
657pub struct CpuEventRow {
658    pub installation_id: Option<String>,
659    pub is_staff: Option<bool>,
660    pub usage_as_percentage: f32,
661    pub core_count: u32,
662    pub app_version: String,
663    pub release_channel: String,
664    pub time: i64,
665    pub session_id: Option<String>,
666    // pub normalized_cpu_usage: f64, MATERIALIZED
667    pub major: Option<i32>,
668    pub minor: Option<i32>,
669    pub patch: Option<i32>,
670}
671
672impl CpuEventRow {
673    fn from_event(
674        event: CpuEvent,
675        wrapper: &EventWrapper,
676        body: &EventRequestBody,
677        first_event_at: chrono::DateTime<chrono::Utc>,
678    ) -> Self {
679        let semver = body.semver();
680        let time =
681            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
682
683        Self {
684            app_version: body.app_version.clone(),
685            major: semver.map(|s| s.major as i32),
686            minor: semver.map(|s| s.minor as i32),
687            patch: semver.map(|s| s.patch as i32),
688            release_channel: body.release_channel.clone().unwrap_or_default(),
689            installation_id: body.installation_id.clone(),
690            session_id: body.session_id.clone(),
691            is_staff: body.is_staff,
692            time: time.timestamp_millis(),
693            usage_as_percentage: event.usage_as_percentage,
694            core_count: event.core_count,
695        }
696    }
697}
698
699#[derive(Serialize, Debug, clickhouse::Row)]
700pub struct MemoryEventRow {
701    // AppInfoBase
702    app_version: String,
703    major: Option<i32>,
704    minor: Option<i32>,
705    patch: Option<i32>,
706    release_channel: String,
707
708    // ClientEventBase
709    installation_id: Option<String>,
710    session_id: Option<String>,
711    is_staff: Option<bool>,
712    time: i64,
713
714    // MemoryEventRow
715    memory_in_bytes: u64,
716    virtual_memory_in_bytes: u64,
717}
718
719impl MemoryEventRow {
720    fn from_event(
721        event: MemoryEvent,
722        wrapper: &EventWrapper,
723        body: &EventRequestBody,
724        first_event_at: chrono::DateTime<chrono::Utc>,
725    ) -> Self {
726        let semver = body.semver();
727        let time =
728            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
729
730        Self {
731            app_version: body.app_version.clone(),
732            major: semver.map(|s| s.major as i32),
733            minor: semver.map(|s| s.minor as i32),
734            patch: semver.map(|s| s.patch as i32),
735            release_channel: body.release_channel.clone().unwrap_or_default(),
736            installation_id: body.installation_id.clone(),
737            session_id: body.session_id.clone(),
738            is_staff: body.is_staff,
739            time: time.timestamp_millis(),
740            memory_in_bytes: event.memory_in_bytes,
741            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
742        }
743    }
744}
745
746#[derive(Serialize, Debug, clickhouse::Row)]
747pub struct AppEventRow {
748    // AppInfoBase
749    app_version: String,
750    major: Option<i32>,
751    minor: Option<i32>,
752    patch: Option<i32>,
753    release_channel: String,
754
755    // ClientEventBase
756    installation_id: Option<String>,
757    session_id: Option<String>,
758    is_staff: Option<bool>,
759    time: i64,
760
761    // AppEventRow
762    operation: String,
763}
764
765impl AppEventRow {
766    fn from_event(
767        event: AppEvent,
768        wrapper: &EventWrapper,
769        body: &EventRequestBody,
770        first_event_at: chrono::DateTime<chrono::Utc>,
771    ) -> Self {
772        let semver = body.semver();
773        let time =
774            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
775
776        Self {
777            app_version: body.app_version.clone(),
778            major: semver.map(|s| s.major as i32),
779            minor: semver.map(|s| s.minor as i32),
780            patch: semver.map(|s| s.patch as i32),
781            release_channel: body.release_channel.clone().unwrap_or_default(),
782            installation_id: body.installation_id.clone(),
783            session_id: body.session_id.clone(),
784            is_staff: body.is_staff,
785            time: time.timestamp_millis(),
786            operation: event.operation,
787        }
788    }
789}
790
791#[derive(Serialize, Debug, clickhouse::Row)]
792pub struct SettingEventRow {
793    // AppInfoBase
794    app_version: String,
795    major: Option<i32>,
796    minor: Option<i32>,
797    patch: Option<i32>,
798    release_channel: String,
799
800    // ClientEventBase
801    installation_id: Option<String>,
802    session_id: Option<String>,
803    is_staff: Option<bool>,
804    time: i64,
805    // SettingEventRow
806    setting: String,
807    value: String,
808}
809
810impl SettingEventRow {
811    fn from_event(
812        event: SettingEvent,
813        wrapper: &EventWrapper,
814        body: &EventRequestBody,
815        first_event_at: chrono::DateTime<chrono::Utc>,
816    ) -> Self {
817        let semver = body.semver();
818        let time =
819            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
820
821        Self {
822            app_version: body.app_version.clone(),
823            major: semver.map(|s| s.major as i32),
824            minor: semver.map(|s| s.minor as i32),
825            patch: semver.map(|s| s.patch as i32),
826            release_channel: body.release_channel.clone().unwrap_or_default(),
827            installation_id: body.installation_id.clone(),
828            session_id: body.session_id.clone(),
829            is_staff: body.is_staff,
830            time: time.timestamp_millis(),
831            setting: event.setting,
832            value: event.value,
833        }
834    }
835}
836
837#[derive(Serialize, Debug, clickhouse::Row)]
838pub struct EditEventRow {
839    // AppInfoBase
840    app_version: String,
841    major: Option<i32>,
842    minor: Option<i32>,
843    patch: Option<i32>,
844    release_channel: String,
845
846    // ClientEventBase
847    installation_id: Option<String>,
848    // Note: This column name has a typo in the ClickHouse table.
849    #[serde(rename = "sesssion_id")]
850    session_id: Option<String>,
851    is_staff: Option<bool>,
852    time: i64,
853
854    // EditEventRow
855    period_start: i64,
856    period_end: i64,
857    environment: String,
858}
859
860impl EditEventRow {
861    fn from_event(
862        event: EditEvent,
863        wrapper: &EventWrapper,
864        body: &EventRequestBody,
865        first_event_at: chrono::DateTime<chrono::Utc>,
866    ) -> Self {
867        let semver = body.semver();
868        let time =
869            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
870
871        let period_start = time - chrono::Duration::milliseconds(event.duration);
872        let period_end = time;
873
874        Self {
875            app_version: body.app_version.clone(),
876            major: semver.map(|s| s.major as i32),
877            minor: semver.map(|s| s.minor as i32),
878            patch: semver.map(|s| s.patch as i32),
879            release_channel: body.release_channel.clone().unwrap_or_default(),
880            installation_id: body.installation_id.clone(),
881            session_id: body.session_id.clone(),
882            is_staff: body.is_staff,
883            time: time.timestamp_millis(),
884            period_start: period_start.timestamp_millis(),
885            period_end: period_end.timestamp_millis(),
886            environment: event.environment,
887        }
888    }
889}
890
891#[derive(Serialize, Debug, clickhouse::Row)]
892pub struct ActionEventRow {
893    // AppInfoBase
894    app_version: String,
895    major: Option<i32>,
896    minor: Option<i32>,
897    patch: Option<i32>,
898    release_channel: String,
899
900    // ClientEventBase
901    installation_id: Option<String>,
902    // Note: This column name has a typo in the ClickHouse table.
903    #[serde(rename = "sesssion_id")]
904    session_id: Option<String>,
905    is_staff: Option<bool>,
906    time: i64,
907    // ActionEventRow
908    source: String,
909    action: String,
910}
911
912impl ActionEventRow {
913    fn from_event(
914        event: ActionEvent,
915        wrapper: &EventWrapper,
916        body: &EventRequestBody,
917        first_event_at: chrono::DateTime<chrono::Utc>,
918    ) -> Self {
919        let semver = body.semver();
920        let time =
921            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
922
923        Self {
924            app_version: body.app_version.clone(),
925            major: semver.map(|s| s.major as i32),
926            minor: semver.map(|s| s.minor as i32),
927            patch: semver.map(|s| s.patch as i32),
928            release_channel: body.release_channel.clone().unwrap_or_default(),
929            installation_id: body.installation_id.clone(),
930            session_id: body.session_id.clone(),
931            is_staff: body.is_staff,
932            time: time.timestamp_millis(),
933            source: event.source,
934            action: event.action,
935        }
936    }
937}