events.rs

  1use std::sync::Arc;
  2
  3use anyhow::{anyhow, Context};
  4use axum::{
  5    body::Bytes, headers::Header, http::HeaderName, routing::post, Extension, Router, TypedHeader,
  6};
  7use hyper::StatusCode;
  8use lazy_static::lazy_static;
  9use serde::{Serialize, Serializer};
 10use sha2::{Digest, Sha256};
 11use telemetry_events::{
 12    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
 13    EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
 14};
 15
 16use crate::{AppState, Error, Result};
 17
 18pub fn router() -> Router {
 19    Router::new().route("/telemetry/events", post(post_events))
 20}
 21
 22lazy_static! {
 23    static ref ZED_CHECKSUM_HEADER: HeaderName = HeaderName::from_static("x-zed-checksum");
 24    static ref CLOUDFLARE_IP_COUNTRY_HEADER: HeaderName = HeaderName::from_static("cf-ipcountry");
 25}
 26
 27pub struct ZedChecksumHeader(Vec<u8>);
 28
 29impl Header for ZedChecksumHeader {
 30    fn name() -> &'static HeaderName {
 31        &ZED_CHECKSUM_HEADER
 32    }
 33
 34    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 35    where
 36        Self: Sized,
 37        I: Iterator<Item = &'i axum::http::HeaderValue>,
 38    {
 39        let checksum = values
 40            .next()
 41            .ok_or_else(axum::headers::Error::invalid)?
 42            .to_str()
 43            .map_err(|_| axum::headers::Error::invalid())?;
 44
 45        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
 46        Ok(Self(bytes))
 47    }
 48
 49    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 50        unimplemented!()
 51    }
 52}
 53
 54pub struct CloudflareIpCountryHeader(String);
 55
 56impl Header for CloudflareIpCountryHeader {
 57    fn name() -> &'static HeaderName {
 58        &CLOUDFLARE_IP_COUNTRY_HEADER
 59    }
 60
 61    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 62    where
 63        Self: Sized,
 64        I: Iterator<Item = &'i axum::http::HeaderValue>,
 65    {
 66        let country_code = values
 67            .next()
 68            .ok_or_else(axum::headers::Error::invalid)?
 69            .to_str()
 70            .map_err(|_| axum::headers::Error::invalid())?;
 71
 72        Ok(Self(country_code.to_string()))
 73    }
 74
 75    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 76        unimplemented!()
 77    }
 78}
 79
 80pub async fn post_events(
 81    Extension(app): Extension<Arc<AppState>>,
 82    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 83    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 84    body: Bytes,
 85) -> Result<()> {
 86    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
 87        Err(Error::Http(
 88            StatusCode::NOT_IMPLEMENTED,
 89            "not supported".into(),
 90        ))?
 91    };
 92
 93    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
 94        return Err(Error::Http(
 95            StatusCode::INTERNAL_SERVER_ERROR,
 96            "events not enabled".into(),
 97        ))?;
 98    };
 99
100    let mut summer = Sha256::new();
101    summer.update(checksum_seed);
102    summer.update(&body);
103    summer.update(checksum_seed);
104
105    if &checksum[..] != &summer.finalize()[..] {
106        return Err(Error::Http(
107            StatusCode::BAD_REQUEST,
108            "invalid checksum".into(),
109        ))?;
110    }
111
112    let request_body: telemetry_events::EventRequestBody =
113        serde_json::from_slice(&body).map_err(|err| {
114            log::error!("can't parse event json: {err}");
115            Error::Internal(anyhow!(err))
116        })?;
117
118    let mut to_upload = ToUpload::default();
119    let Some(last_event) = request_body.events.last() else {
120        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
121    };
122    let country_code = country_code_header.map(|h| h.0 .0);
123
124    let first_event_at = chrono::Utc::now()
125        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
126
127    for wrapper in &request_body.events {
128        match &wrapper.event {
129            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
130                event.clone(),
131                &wrapper,
132                &request_body,
133                first_event_at,
134                country_code.clone(),
135            )),
136            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
137                event.clone(),
138                &wrapper,
139                &request_body,
140                first_event_at,
141                country_code.clone(),
142            )),
143            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
144                event.clone(),
145                &wrapper,
146                &request_body,
147                first_event_at,
148            )),
149            Event::Assistant(event) => {
150                to_upload
151                    .assistant_events
152                    .push(AssistantEventRow::from_event(
153                        event.clone(),
154                        &wrapper,
155                        &request_body,
156                        first_event_at,
157                    ))
158            }
159            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
160                event.clone(),
161                &wrapper,
162                &request_body,
163                first_event_at,
164            )),
165            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
166                event.clone(),
167                &wrapper,
168                &request_body,
169                first_event_at,
170            )),
171            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
172                event.clone(),
173                &wrapper,
174                &request_body,
175                first_event_at,
176            )),
177            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
178                event.clone(),
179                &wrapper,
180                &request_body,
181                first_event_at,
182            )),
183            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
184                event.clone(),
185                &wrapper,
186                &request_body,
187                first_event_at,
188            )),
189            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
190                event.clone(),
191                &wrapper,
192                &request_body,
193                first_event_at,
194            )),
195        }
196    }
197
198    to_upload
199        .upload(&clickhouse_client)
200        .await
201        .map_err(|err| Error::Internal(anyhow!(err)))?;
202
203    Ok(())
204}
205
206#[derive(Default)]
207struct ToUpload {
208    editor_events: Vec<EditorEventRow>,
209    copilot_events: Vec<CopilotEventRow>,
210    assistant_events: Vec<AssistantEventRow>,
211    call_events: Vec<CallEventRow>,
212    cpu_events: Vec<CpuEventRow>,
213    memory_events: Vec<MemoryEventRow>,
214    app_events: Vec<AppEventRow>,
215    setting_events: Vec<SettingEventRow>,
216    edit_events: Vec<EditEventRow>,
217    action_events: Vec<ActionEventRow>,
218}
219
220impl ToUpload {
221    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
222        Self::upload_to_table("editor_events", &self.editor_events, clickhouse_client)
223            .await
224            .with_context(|| format!("failed to upload to table 'editor_events'"))?;
225        Self::upload_to_table("copilot_events", &self.copilot_events, clickhouse_client)
226            .await
227            .with_context(|| format!("failed to upload to table 'copilot_events'"))?;
228        Self::upload_to_table(
229            "assistant_events",
230            &self.assistant_events,
231            clickhouse_client,
232        )
233        .await
234        .with_context(|| format!("failed to upload to table 'assistant_events'"))?;
235        Self::upload_to_table("call_events", &self.call_events, clickhouse_client)
236            .await
237            .with_context(|| format!("failed to upload to table 'call_events'"))?;
238        Self::upload_to_table("cpu_events", &self.cpu_events, clickhouse_client)
239            .await
240            .with_context(|| format!("failed to upload to table 'cpu_events'"))?;
241        Self::upload_to_table("memory_events", &self.memory_events, clickhouse_client)
242            .await
243            .with_context(|| format!("failed to upload to table 'memory_events'"))?;
244        Self::upload_to_table("app_events", &self.app_events, clickhouse_client)
245            .await
246            .with_context(|| format!("failed to upload to table 'app_events'"))?;
247        Self::upload_to_table("setting_events", &self.setting_events, clickhouse_client)
248            .await
249            .with_context(|| format!("failed to upload to table 'setting_events'"))?;
250        Self::upload_to_table("edit_events", &self.edit_events, clickhouse_client)
251            .await
252            .with_context(|| format!("failed to upload to table 'edit_events'"))?;
253        Self::upload_to_table("action_events", &self.action_events, clickhouse_client)
254            .await
255            .with_context(|| format!("failed to upload to table 'action_events'"))?;
256        Ok(())
257    }
258
259    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
260        table: &str,
261        rows: &[T],
262        clickhouse_client: &clickhouse::Client,
263    ) -> anyhow::Result<()> {
264        if !rows.is_empty() {
265            let mut insert = clickhouse_client.insert(table)?;
266
267            for event in rows {
268                insert.write(event).await?;
269            }
270
271            insert.end().await?;
272        }
273
274        Ok(())
275    }
276}
277
278pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
279where
280    S: Serializer,
281{
282    if country_code.len() != 2 {
283        use serde::ser::Error;
284        return Err(S::Error::custom(
285            "country_code must be exactly 2 characters",
286        ));
287    }
288
289    let country_code = country_code.as_bytes();
290
291    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
292}
293
294#[derive(Serialize, Debug, clickhouse::Row)]
295pub struct EditorEventRow {
296    pub installation_id: String,
297    pub operation: String,
298    pub app_version: String,
299    pub file_extension: String,
300    pub os_name: String,
301    pub os_version: String,
302    pub release_channel: String,
303    pub signed_in: bool,
304    pub vim_mode: bool,
305    #[serde(serialize_with = "serialize_country_code")]
306    pub country_code: String,
307    pub region_code: String,
308    pub city: String,
309    pub time: i64,
310    pub copilot_enabled: bool,
311    pub copilot_enabled_for_language: bool,
312    pub historical_event: bool,
313    pub architecture: String,
314    pub is_staff: Option<bool>,
315    pub session_id: Option<String>,
316    pub major: Option<i32>,
317    pub minor: Option<i32>,
318    pub patch: Option<i32>,
319}
320
321impl EditorEventRow {
322    fn from_event(
323        event: EditorEvent,
324        wrapper: &EventWrapper,
325        body: &EventRequestBody,
326        first_event_at: chrono::DateTime<chrono::Utc>,
327        country_code: Option<String>,
328    ) -> Self {
329        let semver = body.semver();
330        let time =
331            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
332
333        Self {
334            app_version: body.app_version.clone(),
335            major: semver.map(|s| s.major as i32),
336            minor: semver.map(|s| s.minor as i32),
337            patch: semver.map(|s| s.patch as i32),
338            release_channel: body.release_channel.clone().unwrap_or_default(),
339            os_name: body.os_name.clone(),
340            os_version: body.os_version.clone().unwrap_or_default(),
341            architecture: body.architecture.clone(),
342            installation_id: body.installation_id.clone().unwrap_or_default(),
343            session_id: body.session_id.clone(),
344            is_staff: body.is_staff,
345            time: time.timestamp_millis(),
346            operation: event.operation,
347            file_extension: event.file_extension.unwrap_or_default(),
348            signed_in: wrapper.signed_in,
349            vim_mode: event.vim_mode,
350            copilot_enabled: event.copilot_enabled,
351            copilot_enabled_for_language: event.copilot_enabled_for_language,
352            country_code: country_code.unwrap_or("XX".to_string()),
353            region_code: "".to_string(),
354            city: "".to_string(),
355            historical_event: false,
356        }
357    }
358}
359
360#[derive(Serialize, Debug, clickhouse::Row)]
361pub struct CopilotEventRow {
362    pub installation_id: String,
363    pub suggestion_id: String,
364    pub suggestion_accepted: bool,
365    pub app_version: String,
366    pub file_extension: String,
367    pub os_name: String,
368    pub os_version: String,
369    pub release_channel: String,
370    pub signed_in: bool,
371    #[serde(serialize_with = "serialize_country_code")]
372    pub country_code: String,
373    pub region_code: String,
374    pub city: String,
375    pub time: i64,
376    pub is_staff: Option<bool>,
377    pub session_id: Option<String>,
378    pub major: Option<i32>,
379    pub minor: Option<i32>,
380    pub patch: Option<i32>,
381}
382
383impl CopilotEventRow {
384    fn from_event(
385        event: CopilotEvent,
386        wrapper: &EventWrapper,
387        body: &EventRequestBody,
388        first_event_at: chrono::DateTime<chrono::Utc>,
389        country_code: Option<String>,
390    ) -> Self {
391        let semver = body.semver();
392        let time =
393            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
394
395        Self {
396            app_version: body.app_version.clone(),
397            major: semver.map(|s| s.major as i32),
398            minor: semver.map(|s| s.minor as i32),
399            patch: semver.map(|s| s.patch as i32),
400            release_channel: body.release_channel.clone().unwrap_or_default(),
401            os_name: body.os_name.clone(),
402            os_version: body.os_version.clone().unwrap_or_default(),
403            installation_id: body.installation_id.clone().unwrap_or_default(),
404            session_id: body.session_id.clone(),
405            is_staff: body.is_staff,
406            time: time.timestamp_millis(),
407            file_extension: event.file_extension.unwrap_or_default(),
408            signed_in: wrapper.signed_in,
409            country_code: country_code.unwrap_or("XX".to_string()),
410            region_code: "".to_string(),
411            city: "".to_string(),
412            suggestion_id: event.suggestion_id.unwrap_or_default(),
413            suggestion_accepted: event.suggestion_accepted,
414        }
415    }
416}
417
418#[derive(Serialize, Debug, clickhouse::Row)]
419pub struct CallEventRow {
420    // AppInfoBase
421    app_version: String,
422    major: Option<i32>,
423    minor: Option<i32>,
424    patch: Option<i32>,
425    release_channel: String,
426
427    // ClientEventBase
428    installation_id: String,
429    session_id: Option<String>,
430    is_staff: Option<bool>,
431    time: i64,
432
433    // CallEventRow
434    operation: String,
435    room_id: Option<u64>,
436    channel_id: Option<u64>,
437}
438
439impl CallEventRow {
440    fn from_event(
441        event: CallEvent,
442        wrapper: &EventWrapper,
443        body: &EventRequestBody,
444        first_event_at: chrono::DateTime<chrono::Utc>,
445    ) -> Self {
446        let semver = body.semver();
447        let time =
448            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
449
450        Self {
451            app_version: body.app_version.clone(),
452            major: semver.map(|s| s.major as i32),
453            minor: semver.map(|s| s.minor as i32),
454            patch: semver.map(|s| s.patch as i32),
455            release_channel: body.release_channel.clone().unwrap_or_default(),
456            installation_id: body.installation_id.clone().unwrap_or_default(),
457            session_id: body.session_id.clone(),
458            is_staff: body.is_staff,
459            time: time.timestamp_millis(),
460            operation: event.operation,
461            room_id: event.room_id,
462            channel_id: event.channel_id,
463        }
464    }
465}
466
467#[derive(Serialize, Debug, clickhouse::Row)]
468pub struct AssistantEventRow {
469    // AppInfoBase
470    app_version: String,
471    major: Option<i32>,
472    minor: Option<i32>,
473    patch: Option<i32>,
474    release_channel: String,
475
476    // ClientEventBase
477    installation_id: Option<String>,
478    session_id: Option<String>,
479    is_staff: Option<bool>,
480    time: i64,
481
482    // AssistantEventRow
483    conversation_id: String,
484    kind: String,
485    model: String,
486}
487
488impl AssistantEventRow {
489    fn from_event(
490        event: AssistantEvent,
491        wrapper: &EventWrapper,
492        body: &EventRequestBody,
493        first_event_at: chrono::DateTime<chrono::Utc>,
494    ) -> Self {
495        let semver = body.semver();
496        let time =
497            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
498
499        Self {
500            app_version: body.app_version.clone(),
501            major: semver.map(|s| s.major as i32),
502            minor: semver.map(|s| s.minor as i32),
503            patch: semver.map(|s| s.patch as i32),
504            release_channel: body.release_channel.clone().unwrap_or_default(),
505            installation_id: body.installation_id.clone(),
506            session_id: body.session_id.clone(),
507            is_staff: body.is_staff,
508            time: time.timestamp_millis(),
509            conversation_id: event.conversation_id.unwrap_or_default(),
510            kind: event.kind.to_string(),
511            model: event.model,
512        }
513    }
514}
515
516#[derive(Debug, clickhouse::Row, Serialize)]
517pub struct CpuEventRow {
518    pub installation_id: Option<String>,
519    pub is_staff: Option<bool>,
520    pub usage_as_percentage: f32,
521    pub core_count: u32,
522    pub app_version: String,
523    pub release_channel: String,
524    pub time: i64,
525    pub session_id: Option<String>,
526    // pub normalized_cpu_usage: f64, MATERIALIZED
527    pub major: Option<i32>,
528    pub minor: Option<i32>,
529    pub patch: Option<i32>,
530}
531
532impl CpuEventRow {
533    fn from_event(
534        event: CpuEvent,
535        wrapper: &EventWrapper,
536        body: &EventRequestBody,
537        first_event_at: chrono::DateTime<chrono::Utc>,
538    ) -> Self {
539        let semver = body.semver();
540        let time =
541            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
542
543        Self {
544            app_version: body.app_version.clone(),
545            major: semver.map(|s| s.major as i32),
546            minor: semver.map(|s| s.minor as i32),
547            patch: semver.map(|s| s.patch as i32),
548            release_channel: body.release_channel.clone().unwrap_or_default(),
549            installation_id: body.installation_id.clone(),
550            session_id: body.session_id.clone(),
551            is_staff: body.is_staff,
552            time: time.timestamp_millis(),
553            usage_as_percentage: event.usage_as_percentage,
554            core_count: event.core_count,
555        }
556    }
557}
558
559#[derive(Serialize, Debug, clickhouse::Row)]
560pub struct MemoryEventRow {
561    // AppInfoBase
562    app_version: String,
563    major: Option<i32>,
564    minor: Option<i32>,
565    patch: Option<i32>,
566    release_channel: String,
567
568    // ClientEventBase
569    installation_id: Option<String>,
570    session_id: Option<String>,
571    is_staff: Option<bool>,
572    time: i64,
573
574    // MemoryEventRow
575    memory_in_bytes: u64,
576    virtual_memory_in_bytes: u64,
577}
578
579impl MemoryEventRow {
580    fn from_event(
581        event: MemoryEvent,
582        wrapper: &EventWrapper,
583        body: &EventRequestBody,
584        first_event_at: chrono::DateTime<chrono::Utc>,
585    ) -> Self {
586        let semver = body.semver();
587        let time =
588            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
589
590        Self {
591            app_version: body.app_version.clone(),
592            major: semver.map(|s| s.major as i32),
593            minor: semver.map(|s| s.minor as i32),
594            patch: semver.map(|s| s.patch as i32),
595            release_channel: body.release_channel.clone().unwrap_or_default(),
596            installation_id: body.installation_id.clone(),
597            session_id: body.session_id.clone(),
598            is_staff: body.is_staff,
599            time: time.timestamp_millis(),
600            memory_in_bytes: event.memory_in_bytes,
601            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
602        }
603    }
604}
605
606#[derive(Serialize, Debug, clickhouse::Row)]
607pub struct AppEventRow {
608    // AppInfoBase
609    app_version: String,
610    major: Option<i32>,
611    minor: Option<i32>,
612    patch: Option<i32>,
613    release_channel: String,
614
615    // ClientEventBase
616    installation_id: Option<String>,
617    session_id: Option<String>,
618    is_staff: Option<bool>,
619    time: i64,
620
621    // AppEventRow
622    operation: String,
623}
624
625impl AppEventRow {
626    fn from_event(
627        event: AppEvent,
628        wrapper: &EventWrapper,
629        body: &EventRequestBody,
630        first_event_at: chrono::DateTime<chrono::Utc>,
631    ) -> Self {
632        let semver = body.semver();
633        let time =
634            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
635
636        Self {
637            app_version: body.app_version.clone(),
638            major: semver.map(|s| s.major as i32),
639            minor: semver.map(|s| s.minor as i32),
640            patch: semver.map(|s| s.patch as i32),
641            release_channel: body.release_channel.clone().unwrap_or_default(),
642            installation_id: body.installation_id.clone(),
643            session_id: body.session_id.clone(),
644            is_staff: body.is_staff,
645            time: time.timestamp_millis(),
646            operation: event.operation,
647        }
648    }
649}
650
651#[derive(Serialize, Debug, clickhouse::Row)]
652pub struct SettingEventRow {
653    // AppInfoBase
654    app_version: String,
655    major: Option<i32>,
656    minor: Option<i32>,
657    patch: Option<i32>,
658    release_channel: String,
659
660    // ClientEventBase
661    installation_id: Option<String>,
662    session_id: Option<String>,
663    is_staff: Option<bool>,
664    time: i64,
665    // SettingEventRow
666    setting: String,
667    value: String,
668}
669
670impl SettingEventRow {
671    fn from_event(
672        event: SettingEvent,
673        wrapper: &EventWrapper,
674        body: &EventRequestBody,
675        first_event_at: chrono::DateTime<chrono::Utc>,
676    ) -> Self {
677        let semver = body.semver();
678        let time =
679            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
680
681        Self {
682            app_version: body.app_version.clone(),
683            major: semver.map(|s| s.major as i32),
684            minor: semver.map(|s| s.minor as i32),
685            patch: semver.map(|s| s.patch as i32),
686            release_channel: body.release_channel.clone().unwrap_or_default(),
687            installation_id: body.installation_id.clone(),
688            session_id: body.session_id.clone(),
689            is_staff: body.is_staff,
690            time: time.timestamp_millis(),
691            setting: event.setting,
692            value: event.value,
693        }
694    }
695}
696
697#[derive(Serialize, Debug, clickhouse::Row)]
698pub struct EditEventRow {
699    // AppInfoBase
700    app_version: String,
701    major: Option<i32>,
702    minor: Option<i32>,
703    patch: Option<i32>,
704    release_channel: String,
705
706    // ClientEventBase
707    installation_id: Option<String>,
708    // Note: This column name has a typo in the ClickHouse table.
709    #[serde(rename = "sesssion_id")]
710    session_id: Option<String>,
711    is_staff: Option<bool>,
712    time: i64,
713
714    // EditEventRow
715    period_start: i64,
716    period_end: i64,
717    environment: String,
718}
719
720impl EditEventRow {
721    fn from_event(
722        event: EditEvent,
723        wrapper: &EventWrapper,
724        body: &EventRequestBody,
725        first_event_at: chrono::DateTime<chrono::Utc>,
726    ) -> Self {
727        let semver = body.semver();
728        let time =
729            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
730
731        let period_start = time - chrono::Duration::milliseconds(event.duration);
732        let period_end = time;
733
734        Self {
735            app_version: body.app_version.clone(),
736            major: semver.map(|s| s.major as i32),
737            minor: semver.map(|s| s.minor as i32),
738            patch: semver.map(|s| s.patch as i32),
739            release_channel: body.release_channel.clone().unwrap_or_default(),
740            installation_id: body.installation_id.clone(),
741            session_id: body.session_id.clone(),
742            is_staff: body.is_staff,
743            time: time.timestamp_millis(),
744            period_start: period_start.timestamp_millis(),
745            period_end: period_end.timestamp_millis(),
746            environment: event.environment,
747        }
748    }
749}
750
751#[derive(Serialize, Debug, clickhouse::Row)]
752pub struct ActionEventRow {
753    // AppInfoBase
754    app_version: String,
755    major: Option<i32>,
756    minor: Option<i32>,
757    patch: Option<i32>,
758    release_channel: String,
759
760    // ClientEventBase
761    installation_id: Option<String>,
762    // Note: This column name has a typo in the ClickHouse table.
763    #[serde(rename = "sesssion_id")]
764    session_id: Option<String>,
765    is_staff: Option<bool>,
766    time: i64,
767    // ActionEventRow
768    source: String,
769    action: String,
770}
771
772impl ActionEventRow {
773    fn from_event(
774        event: ActionEvent,
775        wrapper: &EventWrapper,
776        body: &EventRequestBody,
777        first_event_at: chrono::DateTime<chrono::Utc>,
778    ) -> Self {
779        let semver = body.semver();
780        let time =
781            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
782
783        Self {
784            app_version: body.app_version.clone(),
785            major: semver.map(|s| s.major as i32),
786            minor: semver.map(|s| s.minor as i32),
787            patch: semver.map(|s| s.patch as i32),
788            release_channel: body.release_channel.clone().unwrap_or_default(),
789            installation_id: body.installation_id.clone(),
790            session_id: body.session_id.clone(),
791            is_staff: body.is_staff,
792            time: time.timestamp_millis(),
793            source: event.source,
794            action: event.action,
795        }
796    }
797}