1use std::sync::{Arc, OnceLock};
2
3use anyhow::{anyhow, Context};
4use aws_sdk_s3::primitives::ByteStream;
5use axum::{
6 body::Bytes, headers::Header, http::HeaderName, routing::post, Extension, Router, TypedHeader,
7};
8use hyper::{HeaderMap, StatusCode};
9use serde::{Serialize, Serializer};
10use sha2::{Digest, Sha256};
11use telemetry_events::{
12 ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
13 EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
14};
15use util::SemanticVersion;
16
17use crate::{api::slack, AppState, Error, Result};
18
19use super::ips_file::IpsFile;
20
21pub fn router() -> Router {
22 Router::new()
23 .route("/telemetry/events", post(post_events))
24 .route("/telemetry/crashes", post(post_crash))
25}
26
27pub struct ZedChecksumHeader(Vec<u8>);
28
29impl Header for ZedChecksumHeader {
30 fn name() -> &'static HeaderName {
31 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
32 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
33 }
34
35 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
36 where
37 Self: Sized,
38 I: Iterator<Item = &'i axum::http::HeaderValue>,
39 {
40 let checksum = values
41 .next()
42 .ok_or_else(axum::headers::Error::invalid)?
43 .to_str()
44 .map_err(|_| axum::headers::Error::invalid())?;
45
46 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
47 Ok(Self(bytes))
48 }
49
50 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
51 unimplemented!()
52 }
53}
54
55pub struct CloudflareIpCountryHeader(String);
56
57impl Header for CloudflareIpCountryHeader {
58 fn name() -> &'static HeaderName {
59 static CLOUDFLARE_IP_COUNTRY_HEADER: OnceLock<HeaderName> = OnceLock::new();
60 CLOUDFLARE_IP_COUNTRY_HEADER.get_or_init(|| HeaderName::from_static("cf-ipcountry"))
61 }
62
63 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
64 where
65 Self: Sized,
66 I: Iterator<Item = &'i axum::http::HeaderValue>,
67 {
68 let country_code = values
69 .next()
70 .ok_or_else(axum::headers::Error::invalid)?
71 .to_str()
72 .map_err(|_| axum::headers::Error::invalid())?;
73
74 Ok(Self(country_code.to_string()))
75 }
76
77 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
78 unimplemented!()
79 }
80}
81
82pub async fn post_crash(
83 Extension(app): Extension<Arc<AppState>>,
84 body: Bytes,
85 headers: HeaderMap,
86) -> Result<()> {
87 static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
88
89 let report = IpsFile::parse(&body)?;
90 let version_threshold = SemanticVersion::new(0, 123, 0);
91
92 let bundle_id = &report.header.bundle_id;
93 let app_version = &report.app_version();
94
95 if bundle_id == "dev.zed.Zed-Dev" {
96 log::error!("Crash uploads from {} are ignored.", bundle_id);
97 return Ok(());
98 }
99
100 if app_version.is_none() || app_version.unwrap() < version_threshold {
101 log::error!(
102 "Crash uploads from {} are ignored.",
103 report.header.app_version
104 );
105 return Ok(());
106 }
107 let app_version = app_version.unwrap();
108
109 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
110 let response = blob_store_client
111 .head_object()
112 .bucket(CRASH_REPORTS_BUCKET)
113 .key(report.header.incident_id.clone() + ".ips")
114 .send()
115 .await;
116
117 if response.is_ok() {
118 log::info!("We've already uploaded this crash");
119 return Ok(());
120 }
121
122 blob_store_client
123 .put_object()
124 .bucket(CRASH_REPORTS_BUCKET)
125 .key(report.header.incident_id.clone() + ".ips")
126 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
127 .body(ByteStream::from(body.to_vec()))
128 .send()
129 .await
130 .map_err(|e| log::error!("Failed to upload crash: {}", e))
131 .ok();
132 }
133
134 let recent_panic_on: Option<i64> = headers
135 .get("x-zed-panicked-on")
136 .and_then(|h| h.to_str().ok())
137 .and_then(|s| s.parse().ok());
138 let mut recent_panic = None;
139
140 if let Some(recent_panic_on) = recent_panic_on {
141 let crashed_at = match report.timestamp() {
142 Ok(t) => Some(t),
143 Err(e) => {
144 log::error!("Can't parse {}: {}", report.header.timestamp, e);
145 None
146 }
147 };
148 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
149 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
150 }
151 }
152
153 let description = report.description(recent_panic);
154 let summary = report.backtrace_summary();
155
156 tracing::error!(
157 service = "client",
158 version = %report.header.app_version,
159 os_version = %report.header.os_version,
160 bundle_id = %report.header.bundle_id,
161 incident_id = %report.header.incident_id,
162 description = %description,
163 backtrace = %summary,
164 "crash report");
165
166 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
167 let payload = slack::WebhookBody::new(|w| {
168 w.add_section(|s| s.text(slack::Text::markdown(description)))
169 .add_section(|s| {
170 s.add_field(slack::Text::markdown(format!(
171 "*Version:*\n{} ({})",
172 bundle_id, app_version
173 )))
174 .add_field({
175 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
176 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
177 hostname.strip_prefix("http://").unwrap_or_default()
178 });
179
180 slack::Text::markdown(format!(
181 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
182 CRASH_REPORTS_BUCKET,
183 hostname,
184 report.header.incident_id,
185 report
186 .header
187 .incident_id
188 .chars()
189 .take(8)
190 .collect::<String>(),
191 ))
192 })
193 })
194 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
195 });
196 let payload_json = serde_json::to_string(&payload).map_err(|err| {
197 log::error!("Failed to serialize payload to JSON: {err}");
198 Error::Internal(anyhow!(err))
199 })?;
200
201 reqwest::Client::new()
202 .post(slack_panics_webhook)
203 .header("Content-Type", "application/json")
204 .body(payload_json)
205 .send()
206 .await
207 .map_err(|err| {
208 log::error!("Failed to send payload to Slack: {err}");
209 Error::Internal(anyhow!(err))
210 })?;
211 }
212
213 Ok(())
214}
215
216pub async fn post_events(
217 Extension(app): Extension<Arc<AppState>>,
218 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
219 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
220 body: Bytes,
221) -> Result<()> {
222 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
223 Err(Error::Http(
224 StatusCode::NOT_IMPLEMENTED,
225 "not supported".into(),
226 ))?
227 };
228
229 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
230 return Err(Error::Http(
231 StatusCode::INTERNAL_SERVER_ERROR,
232 "events not enabled".into(),
233 ))?;
234 };
235
236 let mut summer = Sha256::new();
237 summer.update(checksum_seed);
238 summer.update(&body);
239 summer.update(checksum_seed);
240
241 if &checksum != &summer.finalize()[..] {
242 return Err(Error::Http(
243 StatusCode::BAD_REQUEST,
244 "invalid checksum".into(),
245 ))?;
246 }
247
248 let request_body: telemetry_events::EventRequestBody =
249 serde_json::from_slice(&body).map_err(|err| {
250 log::error!("can't parse event json: {err}");
251 Error::Internal(anyhow!(err))
252 })?;
253
254 let mut to_upload = ToUpload::default();
255 let Some(last_event) = request_body.events.last() else {
256 return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
257 };
258 let country_code = country_code_header.map(|h| h.0 .0);
259
260 let first_event_at = chrono::Utc::now()
261 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
262
263 for wrapper in &request_body.events {
264 match &wrapper.event {
265 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
266 event.clone(),
267 &wrapper,
268 &request_body,
269 first_event_at,
270 country_code.clone(),
271 )),
272 Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
273 event.clone(),
274 &wrapper,
275 &request_body,
276 first_event_at,
277 country_code.clone(),
278 )),
279 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
280 event.clone(),
281 &wrapper,
282 &request_body,
283 first_event_at,
284 )),
285 Event::Assistant(event) => {
286 to_upload
287 .assistant_events
288 .push(AssistantEventRow::from_event(
289 event.clone(),
290 &wrapper,
291 &request_body,
292 first_event_at,
293 ))
294 }
295 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
296 event.clone(),
297 &wrapper,
298 &request_body,
299 first_event_at,
300 )),
301 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
302 event.clone(),
303 &wrapper,
304 &request_body,
305 first_event_at,
306 )),
307 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
308 event.clone(),
309 &wrapper,
310 &request_body,
311 first_event_at,
312 )),
313 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
314 event.clone(),
315 &wrapper,
316 &request_body,
317 first_event_at,
318 )),
319 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
320 event.clone(),
321 &wrapper,
322 &request_body,
323 first_event_at,
324 )),
325 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
326 event.clone(),
327 &wrapper,
328 &request_body,
329 first_event_at,
330 )),
331 }
332 }
333
334 to_upload
335 .upload(&clickhouse_client)
336 .await
337 .map_err(|err| Error::Internal(anyhow!(err)))?;
338
339 Ok(())
340}
341
342#[derive(Default)]
343struct ToUpload {
344 editor_events: Vec<EditorEventRow>,
345 copilot_events: Vec<CopilotEventRow>,
346 assistant_events: Vec<AssistantEventRow>,
347 call_events: Vec<CallEventRow>,
348 cpu_events: Vec<CpuEventRow>,
349 memory_events: Vec<MemoryEventRow>,
350 app_events: Vec<AppEventRow>,
351 setting_events: Vec<SettingEventRow>,
352 edit_events: Vec<EditEventRow>,
353 action_events: Vec<ActionEventRow>,
354}
355
356impl ToUpload {
357 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
358 Self::upload_to_table("editor_events", &self.editor_events, clickhouse_client)
359 .await
360 .with_context(|| format!("failed to upload to table 'editor_events'"))?;
361 Self::upload_to_table("copilot_events", &self.copilot_events, clickhouse_client)
362 .await
363 .with_context(|| format!("failed to upload to table 'copilot_events'"))?;
364 Self::upload_to_table(
365 "assistant_events",
366 &self.assistant_events,
367 clickhouse_client,
368 )
369 .await
370 .with_context(|| format!("failed to upload to table 'assistant_events'"))?;
371 Self::upload_to_table("call_events", &self.call_events, clickhouse_client)
372 .await
373 .with_context(|| format!("failed to upload to table 'call_events'"))?;
374 Self::upload_to_table("cpu_events", &self.cpu_events, clickhouse_client)
375 .await
376 .with_context(|| format!("failed to upload to table 'cpu_events'"))?;
377 Self::upload_to_table("memory_events", &self.memory_events, clickhouse_client)
378 .await
379 .with_context(|| format!("failed to upload to table 'memory_events'"))?;
380 Self::upload_to_table("app_events", &self.app_events, clickhouse_client)
381 .await
382 .with_context(|| format!("failed to upload to table 'app_events'"))?;
383 Self::upload_to_table("setting_events", &self.setting_events, clickhouse_client)
384 .await
385 .with_context(|| format!("failed to upload to table 'setting_events'"))?;
386 Self::upload_to_table("edit_events", &self.edit_events, clickhouse_client)
387 .await
388 .with_context(|| format!("failed to upload to table 'edit_events'"))?;
389 Self::upload_to_table("action_events", &self.action_events, clickhouse_client)
390 .await
391 .with_context(|| format!("failed to upload to table 'action_events'"))?;
392 Ok(())
393 }
394
395 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
396 table: &str,
397 rows: &[T],
398 clickhouse_client: &clickhouse::Client,
399 ) -> anyhow::Result<()> {
400 if !rows.is_empty() {
401 let mut insert = clickhouse_client.insert(table)?;
402
403 for event in rows {
404 insert.write(event).await?;
405 }
406
407 insert.end().await?;
408 }
409
410 Ok(())
411 }
412}
413
414pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
415where
416 S: Serializer,
417{
418 if country_code.len() != 2 {
419 use serde::ser::Error;
420 return Err(S::Error::custom(
421 "country_code must be exactly 2 characters",
422 ));
423 }
424
425 let country_code = country_code.as_bytes();
426
427 serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
428}
429
430#[derive(Serialize, Debug, clickhouse::Row)]
431pub struct EditorEventRow {
432 pub installation_id: String,
433 pub operation: String,
434 pub app_version: String,
435 pub file_extension: String,
436 pub os_name: String,
437 pub os_version: String,
438 pub release_channel: String,
439 pub signed_in: bool,
440 pub vim_mode: bool,
441 #[serde(serialize_with = "serialize_country_code")]
442 pub country_code: String,
443 pub region_code: String,
444 pub city: String,
445 pub time: i64,
446 pub copilot_enabled: bool,
447 pub copilot_enabled_for_language: bool,
448 pub historical_event: bool,
449 pub architecture: String,
450 pub is_staff: Option<bool>,
451 pub session_id: Option<String>,
452 pub major: Option<i32>,
453 pub minor: Option<i32>,
454 pub patch: Option<i32>,
455}
456
457impl EditorEventRow {
458 fn from_event(
459 event: EditorEvent,
460 wrapper: &EventWrapper,
461 body: &EventRequestBody,
462 first_event_at: chrono::DateTime<chrono::Utc>,
463 country_code: Option<String>,
464 ) -> Self {
465 let semver = body.semver();
466 let time =
467 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
468
469 Self {
470 app_version: body.app_version.clone(),
471 major: semver.map(|s| s.major as i32),
472 minor: semver.map(|s| s.minor as i32),
473 patch: semver.map(|s| s.patch as i32),
474 release_channel: body.release_channel.clone().unwrap_or_default(),
475 os_name: body.os_name.clone(),
476 os_version: body.os_version.clone().unwrap_or_default(),
477 architecture: body.architecture.clone(),
478 installation_id: body.installation_id.clone().unwrap_or_default(),
479 session_id: body.session_id.clone(),
480 is_staff: body.is_staff,
481 time: time.timestamp_millis(),
482 operation: event.operation,
483 file_extension: event.file_extension.unwrap_or_default(),
484 signed_in: wrapper.signed_in,
485 vim_mode: event.vim_mode,
486 copilot_enabled: event.copilot_enabled,
487 copilot_enabled_for_language: event.copilot_enabled_for_language,
488 country_code: country_code.unwrap_or("XX".to_string()),
489 region_code: "".to_string(),
490 city: "".to_string(),
491 historical_event: false,
492 }
493 }
494}
495
496#[derive(Serialize, Debug, clickhouse::Row)]
497pub struct CopilotEventRow {
498 pub installation_id: String,
499 pub suggestion_id: String,
500 pub suggestion_accepted: bool,
501 pub app_version: String,
502 pub file_extension: String,
503 pub os_name: String,
504 pub os_version: String,
505 pub release_channel: String,
506 pub signed_in: bool,
507 #[serde(serialize_with = "serialize_country_code")]
508 pub country_code: String,
509 pub region_code: String,
510 pub city: String,
511 pub time: i64,
512 pub is_staff: Option<bool>,
513 pub session_id: Option<String>,
514 pub major: Option<i32>,
515 pub minor: Option<i32>,
516 pub patch: Option<i32>,
517}
518
519impl CopilotEventRow {
520 fn from_event(
521 event: CopilotEvent,
522 wrapper: &EventWrapper,
523 body: &EventRequestBody,
524 first_event_at: chrono::DateTime<chrono::Utc>,
525 country_code: Option<String>,
526 ) -> Self {
527 let semver = body.semver();
528 let time =
529 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
530
531 Self {
532 app_version: body.app_version.clone(),
533 major: semver.map(|s| s.major as i32),
534 minor: semver.map(|s| s.minor as i32),
535 patch: semver.map(|s| s.patch as i32),
536 release_channel: body.release_channel.clone().unwrap_or_default(),
537 os_name: body.os_name.clone(),
538 os_version: body.os_version.clone().unwrap_or_default(),
539 installation_id: body.installation_id.clone().unwrap_or_default(),
540 session_id: body.session_id.clone(),
541 is_staff: body.is_staff,
542 time: time.timestamp_millis(),
543 file_extension: event.file_extension.unwrap_or_default(),
544 signed_in: wrapper.signed_in,
545 country_code: country_code.unwrap_or("XX".to_string()),
546 region_code: "".to_string(),
547 city: "".to_string(),
548 suggestion_id: event.suggestion_id.unwrap_or_default(),
549 suggestion_accepted: event.suggestion_accepted,
550 }
551 }
552}
553
554#[derive(Serialize, Debug, clickhouse::Row)]
555pub struct CallEventRow {
556 // AppInfoBase
557 app_version: String,
558 major: Option<i32>,
559 minor: Option<i32>,
560 patch: Option<i32>,
561 release_channel: String,
562
563 // ClientEventBase
564 installation_id: String,
565 session_id: Option<String>,
566 is_staff: Option<bool>,
567 time: i64,
568
569 // CallEventRow
570 operation: String,
571 room_id: Option<u64>,
572 channel_id: Option<u64>,
573}
574
575impl CallEventRow {
576 fn from_event(
577 event: CallEvent,
578 wrapper: &EventWrapper,
579 body: &EventRequestBody,
580 first_event_at: chrono::DateTime<chrono::Utc>,
581 ) -> Self {
582 let semver = body.semver();
583 let time =
584 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
585
586 Self {
587 app_version: body.app_version.clone(),
588 major: semver.map(|s| s.major as i32),
589 minor: semver.map(|s| s.minor as i32),
590 patch: semver.map(|s| s.patch as i32),
591 release_channel: body.release_channel.clone().unwrap_or_default(),
592 installation_id: body.installation_id.clone().unwrap_or_default(),
593 session_id: body.session_id.clone(),
594 is_staff: body.is_staff,
595 time: time.timestamp_millis(),
596 operation: event.operation,
597 room_id: event.room_id,
598 channel_id: event.channel_id,
599 }
600 }
601}
602
603#[derive(Serialize, Debug, clickhouse::Row)]
604pub struct AssistantEventRow {
605 // AppInfoBase
606 app_version: String,
607 major: Option<i32>,
608 minor: Option<i32>,
609 patch: Option<i32>,
610 release_channel: String,
611
612 // ClientEventBase
613 installation_id: Option<String>,
614 session_id: Option<String>,
615 is_staff: Option<bool>,
616 time: i64,
617
618 // AssistantEventRow
619 conversation_id: String,
620 kind: String,
621 model: String,
622}
623
624impl AssistantEventRow {
625 fn from_event(
626 event: AssistantEvent,
627 wrapper: &EventWrapper,
628 body: &EventRequestBody,
629 first_event_at: chrono::DateTime<chrono::Utc>,
630 ) -> Self {
631 let semver = body.semver();
632 let time =
633 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
634
635 Self {
636 app_version: body.app_version.clone(),
637 major: semver.map(|s| s.major as i32),
638 minor: semver.map(|s| s.minor as i32),
639 patch: semver.map(|s| s.patch as i32),
640 release_channel: body.release_channel.clone().unwrap_or_default(),
641 installation_id: body.installation_id.clone(),
642 session_id: body.session_id.clone(),
643 is_staff: body.is_staff,
644 time: time.timestamp_millis(),
645 conversation_id: event.conversation_id.unwrap_or_default(),
646 kind: event.kind.to_string(),
647 model: event.model,
648 }
649 }
650}
651
652#[derive(Debug, clickhouse::Row, Serialize)]
653pub struct CpuEventRow {
654 pub installation_id: Option<String>,
655 pub is_staff: Option<bool>,
656 pub usage_as_percentage: f32,
657 pub core_count: u32,
658 pub app_version: String,
659 pub release_channel: String,
660 pub time: i64,
661 pub session_id: Option<String>,
662 // pub normalized_cpu_usage: f64, MATERIALIZED
663 pub major: Option<i32>,
664 pub minor: Option<i32>,
665 pub patch: Option<i32>,
666}
667
668impl CpuEventRow {
669 fn from_event(
670 event: CpuEvent,
671 wrapper: &EventWrapper,
672 body: &EventRequestBody,
673 first_event_at: chrono::DateTime<chrono::Utc>,
674 ) -> Self {
675 let semver = body.semver();
676 let time =
677 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
678
679 Self {
680 app_version: body.app_version.clone(),
681 major: semver.map(|s| s.major as i32),
682 minor: semver.map(|s| s.minor as i32),
683 patch: semver.map(|s| s.patch as i32),
684 release_channel: body.release_channel.clone().unwrap_or_default(),
685 installation_id: body.installation_id.clone(),
686 session_id: body.session_id.clone(),
687 is_staff: body.is_staff,
688 time: time.timestamp_millis(),
689 usage_as_percentage: event.usage_as_percentage,
690 core_count: event.core_count,
691 }
692 }
693}
694
695#[derive(Serialize, Debug, clickhouse::Row)]
696pub struct MemoryEventRow {
697 // AppInfoBase
698 app_version: String,
699 major: Option<i32>,
700 minor: Option<i32>,
701 patch: Option<i32>,
702 release_channel: String,
703
704 // ClientEventBase
705 installation_id: Option<String>,
706 session_id: Option<String>,
707 is_staff: Option<bool>,
708 time: i64,
709
710 // MemoryEventRow
711 memory_in_bytes: u64,
712 virtual_memory_in_bytes: u64,
713}
714
715impl MemoryEventRow {
716 fn from_event(
717 event: MemoryEvent,
718 wrapper: &EventWrapper,
719 body: &EventRequestBody,
720 first_event_at: chrono::DateTime<chrono::Utc>,
721 ) -> Self {
722 let semver = body.semver();
723 let time =
724 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
725
726 Self {
727 app_version: body.app_version.clone(),
728 major: semver.map(|s| s.major as i32),
729 minor: semver.map(|s| s.minor as i32),
730 patch: semver.map(|s| s.patch as i32),
731 release_channel: body.release_channel.clone().unwrap_or_default(),
732 installation_id: body.installation_id.clone(),
733 session_id: body.session_id.clone(),
734 is_staff: body.is_staff,
735 time: time.timestamp_millis(),
736 memory_in_bytes: event.memory_in_bytes,
737 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
738 }
739 }
740}
741
742#[derive(Serialize, Debug, clickhouse::Row)]
743pub struct AppEventRow {
744 // AppInfoBase
745 app_version: String,
746 major: Option<i32>,
747 minor: Option<i32>,
748 patch: Option<i32>,
749 release_channel: String,
750
751 // ClientEventBase
752 installation_id: Option<String>,
753 session_id: Option<String>,
754 is_staff: Option<bool>,
755 time: i64,
756
757 // AppEventRow
758 operation: String,
759}
760
761impl AppEventRow {
762 fn from_event(
763 event: AppEvent,
764 wrapper: &EventWrapper,
765 body: &EventRequestBody,
766 first_event_at: chrono::DateTime<chrono::Utc>,
767 ) -> Self {
768 let semver = body.semver();
769 let time =
770 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
771
772 Self {
773 app_version: body.app_version.clone(),
774 major: semver.map(|s| s.major as i32),
775 minor: semver.map(|s| s.minor as i32),
776 patch: semver.map(|s| s.patch as i32),
777 release_channel: body.release_channel.clone().unwrap_or_default(),
778 installation_id: body.installation_id.clone(),
779 session_id: body.session_id.clone(),
780 is_staff: body.is_staff,
781 time: time.timestamp_millis(),
782 operation: event.operation,
783 }
784 }
785}
786
787#[derive(Serialize, Debug, clickhouse::Row)]
788pub struct SettingEventRow {
789 // AppInfoBase
790 app_version: String,
791 major: Option<i32>,
792 minor: Option<i32>,
793 patch: Option<i32>,
794 release_channel: String,
795
796 // ClientEventBase
797 installation_id: Option<String>,
798 session_id: Option<String>,
799 is_staff: Option<bool>,
800 time: i64,
801 // SettingEventRow
802 setting: String,
803 value: String,
804}
805
806impl SettingEventRow {
807 fn from_event(
808 event: SettingEvent,
809 wrapper: &EventWrapper,
810 body: &EventRequestBody,
811 first_event_at: chrono::DateTime<chrono::Utc>,
812 ) -> Self {
813 let semver = body.semver();
814 let time =
815 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
816
817 Self {
818 app_version: body.app_version.clone(),
819 major: semver.map(|s| s.major as i32),
820 minor: semver.map(|s| s.minor as i32),
821 patch: semver.map(|s| s.patch as i32),
822 release_channel: body.release_channel.clone().unwrap_or_default(),
823 installation_id: body.installation_id.clone(),
824 session_id: body.session_id.clone(),
825 is_staff: body.is_staff,
826 time: time.timestamp_millis(),
827 setting: event.setting,
828 value: event.value,
829 }
830 }
831}
832
833#[derive(Serialize, Debug, clickhouse::Row)]
834pub struct EditEventRow {
835 // AppInfoBase
836 app_version: String,
837 major: Option<i32>,
838 minor: Option<i32>,
839 patch: Option<i32>,
840 release_channel: String,
841
842 // ClientEventBase
843 installation_id: Option<String>,
844 // Note: This column name has a typo in the ClickHouse table.
845 #[serde(rename = "sesssion_id")]
846 session_id: Option<String>,
847 is_staff: Option<bool>,
848 time: i64,
849
850 // EditEventRow
851 period_start: i64,
852 period_end: i64,
853 environment: String,
854}
855
856impl EditEventRow {
857 fn from_event(
858 event: EditEvent,
859 wrapper: &EventWrapper,
860 body: &EventRequestBody,
861 first_event_at: chrono::DateTime<chrono::Utc>,
862 ) -> Self {
863 let semver = body.semver();
864 let time =
865 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
866
867 let period_start = time - chrono::Duration::milliseconds(event.duration);
868 let period_end = time;
869
870 Self {
871 app_version: body.app_version.clone(),
872 major: semver.map(|s| s.major as i32),
873 minor: semver.map(|s| s.minor as i32),
874 patch: semver.map(|s| s.patch as i32),
875 release_channel: body.release_channel.clone().unwrap_or_default(),
876 installation_id: body.installation_id.clone(),
877 session_id: body.session_id.clone(),
878 is_staff: body.is_staff,
879 time: time.timestamp_millis(),
880 period_start: period_start.timestamp_millis(),
881 period_end: period_end.timestamp_millis(),
882 environment: event.environment,
883 }
884 }
885}
886
887#[derive(Serialize, Debug, clickhouse::Row)]
888pub struct ActionEventRow {
889 // AppInfoBase
890 app_version: String,
891 major: Option<i32>,
892 minor: Option<i32>,
893 patch: Option<i32>,
894 release_channel: String,
895
896 // ClientEventBase
897 installation_id: Option<String>,
898 // Note: This column name has a typo in the ClickHouse table.
899 #[serde(rename = "sesssion_id")]
900 session_id: Option<String>,
901 is_staff: Option<bool>,
902 time: i64,
903 // ActionEventRow
904 source: String,
905 action: String,
906}
907
908impl ActionEventRow {
909 fn from_event(
910 event: ActionEvent,
911 wrapper: &EventWrapper,
912 body: &EventRequestBody,
913 first_event_at: chrono::DateTime<chrono::Utc>,
914 ) -> Self {
915 let semver = body.semver();
916 let time =
917 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
918
919 Self {
920 app_version: body.app_version.clone(),
921 major: semver.map(|s| s.major as i32),
922 minor: semver.map(|s| s.minor as i32),
923 patch: semver.map(|s| s.patch as i32),
924 release_channel: body.release_channel.clone().unwrap_or_default(),
925 installation_id: body.installation_id.clone(),
926 session_id: body.session_id.clone(),
927 is_staff: body.is_staff,
928 time: time.timestamp_millis(),
929 source: event.source,
930 action: event.action,
931 }
932 }
933}