events.rs

  1use std::sync::{Arc, OnceLock};
  2
  3use anyhow::{anyhow, Context};
  4use axum::{
  5    body::Bytes, headers::Header, http::HeaderName, routing::post, Extension, Router, TypedHeader,
  6};
  7use hyper::StatusCode;
  8use serde::{Serialize, Serializer};
  9use sha2::{Digest, Sha256};
 10use telemetry_events::{
 11    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
 12    EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
 13};
 14
 15use crate::{AppState, Error, Result};
 16
 17pub fn router() -> Router {
 18    Router::new().route("/telemetry/events", post(post_events))
 19}
 20
 21pub struct ZedChecksumHeader(Vec<u8>);
 22
 23impl Header for ZedChecksumHeader {
 24    fn name() -> &'static HeaderName {
 25        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
 26        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
 27    }
 28
 29    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 30    where
 31        Self: Sized,
 32        I: Iterator<Item = &'i axum::http::HeaderValue>,
 33    {
 34        let checksum = values
 35            .next()
 36            .ok_or_else(axum::headers::Error::invalid)?
 37            .to_str()
 38            .map_err(|_| axum::headers::Error::invalid())?;
 39
 40        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
 41        Ok(Self(bytes))
 42    }
 43
 44    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 45        unimplemented!()
 46    }
 47}
 48
 49pub struct CloudflareIpCountryHeader(String);
 50
 51impl Header for CloudflareIpCountryHeader {
 52    fn name() -> &'static HeaderName {
 53        static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
 54        CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
 55    }
 56
 57    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 58    where
 59        Self: Sized,
 60        I: Iterator<Item = &'i axum::http::HeaderValue>,
 61    {
 62        let country_code = values
 63            .next()
 64            .ok_or_else(axum::headers::Error::invalid)?
 65            .to_str()
 66            .map_err(|_| axum::headers::Error::invalid())?;
 67
 68        Ok(Self(country_code.to_string()))
 69    }
 70
 71    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 72        unimplemented!()
 73    }
 74}
 75
 76pub async fn post_events(
 77    Extension(app): Extension<Arc<AppState>>,
 78    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 79    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 80    body: Bytes,
 81) -> Result<()> {
 82    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 83        Err(Error::Http(
 84            StatusCode::NOT_IMPLEMENTED,
 85            "not supported".into(),
 86        ))?
 87    };
 88
 89    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
 90        return Err(Error::Http(
 91            StatusCode::INTERNAL_SERVER_ERROR,
 92            "events not enabled".into(),
 93        ))?;
 94    };
 95
 96    let mut summer = Sha256::new();
 97    summer.update(checksum_seed);
 98    summer.update(&body);
 99    summer.update(checksum_seed);
100
101    if &checksum[..] != &summer.finalize()[..] {
102        return Err(Error::Http(
103            StatusCode::BAD_REQUEST,
104            "invalid checksum".into(),
105        ))?;
106    }
107
108    let request_body: telemetry_events::EventRequestBody =
109        serde_json::from_slice(&body).map_err(|err| {
110            log::error!("can't parse event json: {err}");
111            Error::Internal(anyhow!(err))
112        })?;
113
114    let mut to_upload = ToUpload::default();
115    let Some(last_event) = request_body.events.last() else {
116        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
117    };
118    let country_code = country_code_header.map(|h| h.0 .0);
119
120    let first_event_at = chrono::Utc::now()
121        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
122
123    for wrapper in &request_body.events {
124        match &wrapper.event {
125            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
126                event.clone(),
127                &wrapper,
128                &request_body,
129                first_event_at,
130                country_code.clone(),
131            )),
132            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
133                event.clone(),
134                &wrapper,
135                &request_body,
136                first_event_at,
137                country_code.clone(),
138            )),
139            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
140                event.clone(),
141                &wrapper,
142                &request_body,
143                first_event_at,
144            )),
145            Event::Assistant(event) => {
146                to_upload
147                    .assistant_events
148                    .push(AssistantEventRow::from_event(
149                        event.clone(),
150                        &wrapper,
151                        &request_body,
152                        first_event_at,
153                    ))
154            }
155            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
156                event.clone(),
157                &wrapper,
158                &request_body,
159                first_event_at,
160            )),
161            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
162                event.clone(),
163                &wrapper,
164                &request_body,
165                first_event_at,
166            )),
167            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
168                event.clone(),
169                &wrapper,
170                &request_body,
171                first_event_at,
172            )),
173            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
174                event.clone(),
175                &wrapper,
176                &request_body,
177                first_event_at,
178            )),
179            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
180                event.clone(),
181                &wrapper,
182                &request_body,
183                first_event_at,
184            )),
185            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
186                event.clone(),
187                &wrapper,
188                &request_body,
189                first_event_at,
190            )),
191        }
192    }
193
194    to_upload
195        .upload(&clickhouse_client)
196        .await
197        .map_err(|err| Error::Internal(anyhow!(err)))?;
198
199    Ok(())
200}
201
202#[derive(Default)]
203struct ToUpload {
204    editor_events: Vec<EditorEventRow>,
205    copilot_events: Vec<CopilotEventRow>,
206    assistant_events: Vec<AssistantEventRow>,
207    call_events: Vec<CallEventRow>,
208    cpu_events: Vec<CpuEventRow>,
209    memory_events: Vec<MemoryEventRow>,
210    app_events: Vec<AppEventRow>,
211    setting_events: Vec<SettingEventRow>,
212    edit_events: Vec<EditEventRow>,
213    action_events: Vec<ActionEventRow>,
214}
215
216impl ToUpload {
217    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
218        Self::upload_to_table("editor_events", &self.editor_events, clickhouse_client)
219            .await
220            .with_context(|| format!("failed to upload to table 'editor_events'"))?;
221        Self::upload_to_table("copilot_events", &self.copilot_events, clickhouse_client)
222            .await
223            .with_context(|| format!("failed to upload to table 'copilot_events'"))?;
224        Self::upload_to_table(
225            "assistant_events",
226            &self.assistant_events,
227            clickhouse_client,
228        )
229        .await
230        .with_context(|| format!("failed to upload to table 'assistant_events'"))?;
231        Self::upload_to_table("call_events", &self.call_events, clickhouse_client)
232            .await
233            .with_context(|| format!("failed to upload to table 'call_events'"))?;
234        Self::upload_to_table("cpu_events", &self.cpu_events, clickhouse_client)
235            .await
236            .with_context(|| format!("failed to upload to table 'cpu_events'"))?;
237        Self::upload_to_table("memory_events", &self.memory_events, clickhouse_client)
238            .await
239            .with_context(|| format!("failed to upload to table 'memory_events'"))?;
240        Self::upload_to_table("app_events", &self.app_events, clickhouse_client)
241            .await
242            .with_context(|| format!("failed to upload to table 'app_events'"))?;
243        Self::upload_to_table("setting_events", &self.setting_events, clickhouse_client)
244            .await
245            .with_context(|| format!("failed to upload to table 'setting_events'"))?;
246        Self::upload_to_table("edit_events", &self.edit_events, clickhouse_client)
247            .await
248            .with_context(|| format!("failed to upload to table 'edit_events'"))?;
249        Self::upload_to_table("action_events", &self.action_events, clickhouse_client)
250            .await
251            .with_context(|| format!("failed to upload to table 'action_events'"))?;
252        Ok(())
253    }
254
255    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
256        table: &str,
257        rows: &[T],
258        clickhouse_client: &clickhouse::Client,
259    ) -> anyhow::Result<()> {
260        if !rows.is_empty() {
261            let mut insert = clickhouse_client.insert(table)?;
262
263            for event in rows {
264                insert.write(event).await?;
265            }
266
267            insert.end().await?;
268        }
269
270        Ok(())
271    }
272}
273
274pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
275where
276    S: Serializer,
277{
278    if country_code.len() != 2 {
279        use serde::ser::Error;
280        return Err(S::Error::custom(
281            "country_code must be exactly 2 characters",
282        ));
283    }
284
285    let country_code = country_code.as_bytes();
286
287    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
288}
289
290#[derive(Serialize, Debug, clickhouse::Row)]
291pub struct EditorEventRow {
292    pub installation_id: String,
293    pub operation: String,
294    pub app_version: String,
295    pub file_extension: String,
296    pub os_name: String,
297    pub os_version: String,
298    pub release_channel: String,
299    pub signed_in: bool,
300    pub vim_mode: bool,
301    #[serde(serialize_with = "serialize_country_code")]
302    pub country_code: String,
303    pub region_code: String,
304    pub city: String,
305    pub time: i64,
306    pub copilot_enabled: bool,
307    pub copilot_enabled_for_language: bool,
308    pub historical_event: bool,
309    pub architecture: String,
310    pub is_staff: Option<bool>,
311    pub session_id: Option<String>,
312    pub major: Option<i32>,
313    pub minor: Option<i32>,
314    pub patch: Option<i32>,
315}
316
317impl EditorEventRow {
318    fn from_event(
319        event: EditorEvent,
320        wrapper: &EventWrapper,
321        body: &EventRequestBody,
322        first_event_at: chrono::DateTime<chrono::Utc>,
323        country_code: Option<String>,
324    ) -> Self {
325        let semver = body.semver();
326        let time =
327            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
328
329        Self {
330            app_version: body.app_version.clone(),
331            major: semver.map(|s| s.major as i32),
332            minor: semver.map(|s| s.minor as i32),
333            patch: semver.map(|s| s.patch as i32),
334            release_channel: body.release_channel.clone().unwrap_or_default(),
335            os_name: body.os_name.clone(),
336            os_version: body.os_version.clone().unwrap_or_default(),
337            architecture: body.architecture.clone(),
338            installation_id: body.installation_id.clone().unwrap_or_default(),
339            session_id: body.session_id.clone(),
340            is_staff: body.is_staff,
341            time: time.timestamp_millis(),
342            operation: event.operation,
343            file_extension: event.file_extension.unwrap_or_default(),
344            signed_in: wrapper.signed_in,
345            vim_mode: event.vim_mode,
346            copilot_enabled: event.copilot_enabled,
347            copilot_enabled_for_language: event.copilot_enabled_for_language,
348            country_code: country_code.unwrap_or("XX".to_string()),
349            region_code: "".to_string(),
350            city: "".to_string(),
351            historical_event: false,
352        }
353    }
354}
355
356#[derive(Serialize, Debug, clickhouse::Row)]
357pub struct CopilotEventRow {
358    pub installation_id: String,
359    pub suggestion_id: String,
360    pub suggestion_accepted: bool,
361    pub app_version: String,
362    pub file_extension: String,
363    pub os_name: String,
364    pub os_version: String,
365    pub release_channel: String,
366    pub signed_in: bool,
367    #[serde(serialize_with = "serialize_country_code")]
368    pub country_code: String,
369    pub region_code: String,
370    pub city: String,
371    pub time: i64,
372    pub is_staff: Option<bool>,
373    pub session_id: Option<String>,
374    pub major: Option<i32>,
375    pub minor: Option<i32>,
376    pub patch: Option<i32>,
377}
378
379impl CopilotEventRow {
380    fn from_event(
381        event: CopilotEvent,
382        wrapper: &EventWrapper,
383        body: &EventRequestBody,
384        first_event_at: chrono::DateTime<chrono::Utc>,
385        country_code: Option<String>,
386    ) -> Self {
387        let semver = body.semver();
388        let time =
389            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
390
391        Self {
392            app_version: body.app_version.clone(),
393            major: semver.map(|s| s.major as i32),
394            minor: semver.map(|s| s.minor as i32),
395            patch: semver.map(|s| s.patch as i32),
396            release_channel: body.release_channel.clone().unwrap_or_default(),
397            os_name: body.os_name.clone(),
398            os_version: body.os_version.clone().unwrap_or_default(),
399            installation_id: body.installation_id.clone().unwrap_or_default(),
400            session_id: body.session_id.clone(),
401            is_staff: body.is_staff,
402            time: time.timestamp_millis(),
403            file_extension: event.file_extension.unwrap_or_default(),
404            signed_in: wrapper.signed_in,
405            country_code: country_code.unwrap_or("XX".to_string()),
406            region_code: "".to_string(),
407            city: "".to_string(),
408            suggestion_id: event.suggestion_id.unwrap_or_default(),
409            suggestion_accepted: event.suggestion_accepted,
410        }
411    }
412}
413
414#[derive(Serialize, Debug, clickhouse::Row)]
415pub struct CallEventRow {
416    // AppInfoBase
417    app_version: String,
418    major: Option<i32>,
419    minor: Option<i32>,
420    patch: Option<i32>,
421    release_channel: String,
422
423    // ClientEventBase
424    installation_id: String,
425    session_id: Option<String>,
426    is_staff: Option<bool>,
427    time: i64,
428
429    // CallEventRow
430    operation: String,
431    room_id: Option<u64>,
432    channel_id: Option<u64>,
433}
434
435impl CallEventRow {
436    fn from_event(
437        event: CallEvent,
438        wrapper: &EventWrapper,
439        body: &EventRequestBody,
440        first_event_at: chrono::DateTime<chrono::Utc>,
441    ) -> Self {
442        let semver = body.semver();
443        let time =
444            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
445
446        Self {
447            app_version: body.app_version.clone(),
448            major: semver.map(|s| s.major as i32),
449            minor: semver.map(|s| s.minor as i32),
450            patch: semver.map(|s| s.patch as i32),
451            release_channel: body.release_channel.clone().unwrap_or_default(),
452            installation_id: body.installation_id.clone().unwrap_or_default(),
453            session_id: body.session_id.clone(),
454            is_staff: body.is_staff,
455            time: time.timestamp_millis(),
456            operation: event.operation,
457            room_id: event.room_id,
458            channel_id: event.channel_id,
459        }
460    }
461}
462
463#[derive(Serialize, Debug, clickhouse::Row)]
464pub struct AssistantEventRow {
465    // AppInfoBase
466    app_version: String,
467    major: Option<i32>,
468    minor: Option<i32>,
469    patch: Option<i32>,
470    release_channel: String,
471
472    // ClientEventBase
473    installation_id: Option<String>,
474    session_id: Option<String>,
475    is_staff: Option<bool>,
476    time: i64,
477
478    // AssistantEventRow
479    conversation_id: String,
480    kind: String,
481    model: String,
482}
483
484impl AssistantEventRow {
485    fn from_event(
486        event: AssistantEvent,
487        wrapper: &EventWrapper,
488        body: &EventRequestBody,
489        first_event_at: chrono::DateTime<chrono::Utc>,
490    ) -> Self {
491        let semver = body.semver();
492        let time =
493            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
494
495        Self {
496            app_version: body.app_version.clone(),
497            major: semver.map(|s| s.major as i32),
498            minor: semver.map(|s| s.minor as i32),
499            patch: semver.map(|s| s.patch as i32),
500            release_channel: body.release_channel.clone().unwrap_or_default(),
501            installation_id: body.installation_id.clone(),
502            session_id: body.session_id.clone(),
503            is_staff: body.is_staff,
504            time: time.timestamp_millis(),
505            conversation_id: event.conversation_id.unwrap_or_default(),
506            kind: event.kind.to_string(),
507            model: event.model,
508        }
509    }
510}
511
512#[derive(Debug, clickhouse::Row, Serialize)]
513pub struct CpuEventRow {
514    pub installation_id: Option<String>,
515    pub is_staff: Option<bool>,
516    pub usage_as_percentage: f32,
517    pub core_count: u32,
518    pub app_version: String,
519    pub release_channel: String,
520    pub time: i64,
521    pub session_id: Option<String>,
522    // pub normalized_cpu_usage: f64, MATERIALIZED
523    pub major: Option<i32>,
524    pub minor: Option<i32>,
525    pub patch: Option<i32>,
526}
527
528impl CpuEventRow {
529    fn from_event(
530        event: CpuEvent,
531        wrapper: &EventWrapper,
532        body: &EventRequestBody,
533        first_event_at: chrono::DateTime<chrono::Utc>,
534    ) -> Self {
535        let semver = body.semver();
536        let time =
537            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
538
539        Self {
540            app_version: body.app_version.clone(),
541            major: semver.map(|s| s.major as i32),
542            minor: semver.map(|s| s.minor as i32),
543            patch: semver.map(|s| s.patch as i32),
544            release_channel: body.release_channel.clone().unwrap_or_default(),
545            installation_id: body.installation_id.clone(),
546            session_id: body.session_id.clone(),
547            is_staff: body.is_staff,
548            time: time.timestamp_millis(),
549            usage_as_percentage: event.usage_as_percentage,
550            core_count: event.core_count,
551        }
552    }
553}
554
555#[derive(Serialize, Debug, clickhouse::Row)]
556pub struct MemoryEventRow {
557    // AppInfoBase
558    app_version: String,
559    major: Option<i32>,
560    minor: Option<i32>,
561    patch: Option<i32>,
562    release_channel: String,
563
564    // ClientEventBase
565    installation_id: Option<String>,
566    session_id: Option<String>,
567    is_staff: Option<bool>,
568    time: i64,
569
570    // MemoryEventRow
571    memory_in_bytes: u64,
572    virtual_memory_in_bytes: u64,
573}
574
575impl MemoryEventRow {
576    fn from_event(
577        event: MemoryEvent,
578        wrapper: &EventWrapper,
579        body: &EventRequestBody,
580        first_event_at: chrono::DateTime<chrono::Utc>,
581    ) -> Self {
582        let semver = body.semver();
583        let time =
584            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
585
586        Self {
587            app_version: body.app_version.clone(),
588            major: semver.map(|s| s.major as i32),
589            minor: semver.map(|s| s.minor as i32),
590            patch: semver.map(|s| s.patch as i32),
591            release_channel: body.release_channel.clone().unwrap_or_default(),
592            installation_id: body.installation_id.clone(),
593            session_id: body.session_id.clone(),
594            is_staff: body.is_staff,
595            time: time.timestamp_millis(),
596            memory_in_bytes: event.memory_in_bytes,
597            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
598        }
599    }
600}
601
602#[derive(Serialize, Debug, clickhouse::Row)]
603pub struct AppEventRow {
604    // AppInfoBase
605    app_version: String,
606    major: Option<i32>,
607    minor: Option<i32>,
608    patch: Option<i32>,
609    release_channel: String,
610
611    // ClientEventBase
612    installation_id: Option<String>,
613    session_id: Option<String>,
614    is_staff: Option<bool>,
615    time: i64,
616
617    // AppEventRow
618    operation: String,
619}
620
621impl AppEventRow {
622    fn from_event(
623        event: AppEvent,
624        wrapper: &EventWrapper,
625        body: &EventRequestBody,
626        first_event_at: chrono::DateTime<chrono::Utc>,
627    ) -> Self {
628        let semver = body.semver();
629        let time =
630            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
631
632        Self {
633            app_version: body.app_version.clone(),
634            major: semver.map(|s| s.major as i32),
635            minor: semver.map(|s| s.minor as i32),
636            patch: semver.map(|s| s.patch as i32),
637            release_channel: body.release_channel.clone().unwrap_or_default(),
638            installation_id: body.installation_id.clone(),
639            session_id: body.session_id.clone(),
640            is_staff: body.is_staff,
641            time: time.timestamp_millis(),
642            operation: event.operation,
643        }
644    }
645}
646
647#[derive(Serialize, Debug, clickhouse::Row)]
648pub struct SettingEventRow {
649    // AppInfoBase
650    app_version: String,
651    major: Option<i32>,
652    minor: Option<i32>,
653    patch: Option<i32>,
654    release_channel: String,
655
656    // ClientEventBase
657    installation_id: Option<String>,
658    session_id: Option<String>,
659    is_staff: Option<bool>,
660    time: i64,
661    // SettingEventRow
662    setting: String,
663    value: String,
664}
665
666impl SettingEventRow {
667    fn from_event(
668        event: SettingEvent,
669        wrapper: &EventWrapper,
670        body: &EventRequestBody,
671        first_event_at: chrono::DateTime<chrono::Utc>,
672    ) -> Self {
673        let semver = body.semver();
674        let time =
675            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
676
677        Self {
678            app_version: body.app_version.clone(),
679            major: semver.map(|s| s.major as i32),
680            minor: semver.map(|s| s.minor as i32),
681            patch: semver.map(|s| s.patch as i32),
682            release_channel: body.release_channel.clone().unwrap_or_default(),
683            installation_id: body.installation_id.clone(),
684            session_id: body.session_id.clone(),
685            is_staff: body.is_staff,
686            time: time.timestamp_millis(),
687            setting: event.setting,
688            value: event.value,
689        }
690    }
691}
692
693#[derive(Serialize, Debug, clickhouse::Row)]
694pub struct EditEventRow {
695    // AppInfoBase
696    app_version: String,
697    major: Option<i32>,
698    minor: Option<i32>,
699    patch: Option<i32>,
700    release_channel: String,
701
702    // ClientEventBase
703    installation_id: Option<String>,
704    // Note: This column name has a typo in the ClickHouse table.
705    #[serde(rename = "sesssion_id")]
706    session_id: Option<String>,
707    is_staff: Option<bool>,
708    time: i64,
709
710    // EditEventRow
711    period_start: i64,
712    period_end: i64,
713    environment: String,
714}
715
716impl EditEventRow {
717    fn from_event(
718        event: EditEvent,
719        wrapper: &EventWrapper,
720        body: &EventRequestBody,
721        first_event_at: chrono::DateTime<chrono::Utc>,
722    ) -> Self {
723        let semver = body.semver();
724        let time =
725            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
726
727        let period_start = time - chrono::Duration::milliseconds(event.duration);
728        let period_end = time;
729
730        Self {
731            app_version: body.app_version.clone(),
732            major: semver.map(|s| s.major as i32),
733            minor: semver.map(|s| s.minor as i32),
734            patch: semver.map(|s| s.patch as i32),
735            release_channel: body.release_channel.clone().unwrap_or_default(),
736            installation_id: body.installation_id.clone(),
737            session_id: body.session_id.clone(),
738            is_staff: body.is_staff,
739            time: time.timestamp_millis(),
740            period_start: period_start.timestamp_millis(),
741            period_end: period_end.timestamp_millis(),
742            environment: event.environment,
743        }
744    }
745}
746
747#[derive(Serialize, Debug, clickhouse::Row)]
748pub struct ActionEventRow {
749    // AppInfoBase
750    app_version: String,
751    major: Option<i32>,
752    minor: Option<i32>,
753    patch: Option<i32>,
754    release_channel: String,
755
756    // ClientEventBase
757    installation_id: Option<String>,
758    // Note: This column name has a typo in the ClickHouse table.
759    #[serde(rename = "sesssion_id")]
760    session_id: Option<String>,
761    is_staff: Option<bool>,
762    time: i64,
763    // ActionEventRow
764    source: String,
765    action: String,
766}
767
768impl ActionEventRow {
769    fn from_event(
770        event: ActionEvent,
771        wrapper: &EventWrapper,
772        body: &EventRequestBody,
773        first_event_at: chrono::DateTime<chrono::Utc>,
774    ) -> Self {
775        let semver = body.semver();
776        let time =
777            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
778
779        Self {
780            app_version: body.app_version.clone(),
781            major: semver.map(|s| s.major as i32),
782            minor: semver.map(|s| s.minor as i32),
783            patch: semver.map(|s| s.patch as i32),
784            release_channel: body.release_channel.clone().unwrap_or_default(),
785            installation_id: body.installation_id.clone(),
786            session_id: body.session_id.clone(),
787            is_staff: body.is_staff,
788            time: time.timestamp_millis(),
789            source: event.source,
790            action: event.action,
791        }
792    }
793}