1use std::sync::Arc;
2
3use anyhow::{anyhow, Context};
4use aws_sdk_s3::primitives::ByteStream;
5use axum::{
6 body::Bytes, headers::Header, http::HeaderName, routing::post, Extension, Router, TypedHeader,
7};
8use hyper::StatusCode;
9use hyper::{HeaderMap, StatusCode};
10use serde::{Serialize, Serializer};
11use sha2::{Digest, Sha256};
12use telemetry_events::{
13 ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
14 EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
15};
16use util::SemanticVersion;
17
18use crate::{api::slack, AppState, Error, Result};
19
20use super::ips_file::IpsFile;
21
22pub fn router() -> Router {
23 Router::new()
24 .route("/telemetry/events", post(post_events))
25 .route("/telemetry/crashes", post(post_crash))
26}
27
28lazy_static! {
29 static ref ZED_CHECKSUM_HEADER: HeaderName = HeaderName::from_static("x-zed-checksum");
30 static ref CLOUDFLARE_IP_COUNTRY_HEADER: HeaderName = HeaderName::from_static("cf-ipcountry");
31}
32
33pub struct ZedChecksumHeader(Vec<u8>);
34
35impl Header for ZedChecksumHeader {
36 fn name() -> &'static HeaderName {
37 &ZED_CHECKSUM_HEADER
38 }
39
40 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
41 where
42 Self: Sized,
43 I: Iterator<Item = &'i axum::http::HeaderValue>,
44 {
45 let checksum = values
46 .next()
47 .ok_or_else(axum::headers::Error::invalid)?
48 .to_str()
49 .map_err(|_| axum::headers::Error::invalid())?;
50
51 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
52 Ok(Self(bytes))
53 }
54
55 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
56 unimplemented!()
57 }
58}
59
60pub struct CloudflareIpCountryHeader(String);
61
62impl Header for CloudflareIpCountryHeader {
63 fn name() -> &'static HeaderName {
64 &CLOUDFLARE_IP_COUNTRY_HEADER
65 }
66
67 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
68 where
69 Self: Sized,
70 I: Iterator<Item = &'i axum::http::HeaderValue>,
71 {
72 let country_code = values
73 .next()
74 .ok_or_else(axum::headers::Error::invalid)?
75 .to_str()
76 .map_err(|_| axum::headers::Error::invalid())?;
77
78 Ok(Self(country_code.to_string()))
79 }
80
81 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
82 unimplemented!()
83 }
84}
85
86pub async fn post_crash(
87 Extension(app): Extension<Arc<AppState>>,
88 body: Bytes,
89 headers: HeaderMap,
90) -> Result<()> {
91 static CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
92
93 let report = IpsFile::parse(&body)?;
94 let version_threshold = SemanticVersion::new(0, 123, 0);
95
96 let bundle_id = &report.header.bundle_id;
97 let app_version = &report.app_version();
98
99 if bundle_id == "dev.zed.Zed-Dev" {
100 log::error!("Crash uploads from {} are ignored.", bundle_id);
101 return Ok(());
102 }
103
104 if app_version.is_none() || app_version.unwrap() < version_threshold {
105 log::error!(
106 "Crash uploads from {} are ignored.",
107 report.header.app_version
108 );
109 return Ok(());
110 }
111 let app_version = app_version.unwrap();
112
113 if let Some(blob_store_client) = app.blob_store_client.as_ref() {
114 let response = blob_store_client
115 .head_object()
116 .bucket(CRASH_REPORTS_BUCKET)
117 .key(report.header.incident_id.clone() + ".ips")
118 .send()
119 .await;
120
121 if response.is_ok() {
122 log::info!("We've already uploaded this crash");
123 return Ok(());
124 }
125
126 blob_store_client
127 .put_object()
128 .bucket(CRASH_REPORTS_BUCKET)
129 .key(report.header.incident_id.clone() + ".ips")
130 .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
131 .body(ByteStream::from(body.to_vec()))
132 .send()
133 .await
134 .map_err(|e| log::error!("Failed to upload crash: {}", e))
135 .ok();
136 }
137
138 let recent_panic_on: Option<i64> = headers
139 .get("x-zed-panicked-on")
140 .and_then(|h| h.to_str().ok())
141 .and_then(|s| s.parse().ok());
142 let mut recent_panic = None;
143
144 if let Some(recent_panic_on) = recent_panic_on {
145 let crashed_at = match report.timestamp() {
146 Ok(t) => Some(t),
147 Err(e) => {
148 log::error!("Can't parse {}: {}", report.header.timestamp, e);
149 None
150 }
151 };
152 if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) {
153 recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok());
154 }
155 }
156
157 let description = report.description(recent_panic);
158 let summary = report.backtrace_summary();
159
160 tracing::error!(
161 service = "client",
162 version = %report.header.app_version,
163 os_version = %report.header.os_version,
164 bundle_id = %report.header.bundle_id,
165 incident_id = %report.header.incident_id,
166 description = %description,
167 backtrace = %summary,
168 "crash report");
169
170 if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() {
171 let payload = slack::WebhookBody::new(|w| {
172 w.add_section(|s| s.text(slack::Text::markdown(description)))
173 .add_section(|s| {
174 s.add_field(slack::Text::markdown(format!(
175 "*Version:*\n{} ({})",
176 bundle_id, app_version
177 )))
178 .add_field({
179 let hostname = app.config.blob_store_url.clone().unwrap_or_default();
180 let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| {
181 hostname.strip_prefix("http://").unwrap_or_default()
182 });
183
184 slack::Text::markdown(format!(
185 "*Incident:*\n<https://{}.{}/{}.ips|{}…>",
186 CRASH_REPORTS_BUCKET,
187 hostname,
188 report.header.incident_id,
189 report
190 .header
191 .incident_id
192 .chars()
193 .take(8)
194 .collect::<String>(),
195 ))
196 })
197 })
198 .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary)))
199 });
200 let payload_json = serde_json::to_string(&payload).map_err(|err| {
201 log::error!("Failed to serialize payload to JSON: {err}");
202 Error::Internal(anyhow!(err))
203 })?;
204
205 reqwest::Client::new()
206 .post(slack_panics_webhook)
207 .header("Content-Type", "application/json")
208 .body(payload_json)
209 .send()
210 .await
211 .map_err(|err| {
212 log::error!("Failed to send payload to Slack: {err}");
213 Error::Internal(anyhow!(err))
214 })?;
215 }
216
217 Ok(())
218}
219
220pub async fn post_events(
221 Extension(app): Extension<Arc<AppState>>,
222 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
223 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
224 body: Bytes,
225) -> Result<()> {
226 let Some(clickhouse_client) = app.clickhouse_client.clone() else {
227 Err(Error::Http(
228 StatusCode::NOT_IMPLEMENTED,
229 "not supported".into(),
230 ))?
231 };
232
233 let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
234 return Err(Error::Http(
235 StatusCode::INTERNAL_SERVER_ERROR,
236 "events not enabled".into(),
237 ))?;
238 };
239
240 let mut summer = Sha256::new();
241 summer.update(checksum_seed);
242 summer.update(&body);
243 summer.update(checksum_seed);
244
245 if &checksum != &summer.finalize()[..] {
246 return Err(Error::Http(
247 StatusCode::BAD_REQUEST,
248 "invalid checksum".into(),
249 ))?;
250 }
251
252 let request_body: telemetry_events::EventRequestBody =
253 serde_json::from_slice(&body).map_err(|err| {
254 log::error!("can't parse event json: {err}");
255 Error::Internal(anyhow!(err))
256 })?;
257
258 let mut to_upload = ToUpload::default();
259 let Some(last_event) = request_body.events.last() else {
260 return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
261 };
262 let country_code = country_code_header.map(|h| h.0 .0);
263
264 let first_event_at = chrono::Utc::now()
265 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
266
267 for wrapper in &request_body.events {
268 match &wrapper.event {
269 Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
270 event.clone(),
271 &wrapper,
272 &request_body,
273 first_event_at,
274 country_code.clone(),
275 )),
276 Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
277 event.clone(),
278 &wrapper,
279 &request_body,
280 first_event_at,
281 country_code.clone(),
282 )),
283 Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
284 event.clone(),
285 &wrapper,
286 &request_body,
287 first_event_at,
288 )),
289 Event::Assistant(event) => {
290 to_upload
291 .assistant_events
292 .push(AssistantEventRow::from_event(
293 event.clone(),
294 &wrapper,
295 &request_body,
296 first_event_at,
297 ))
298 }
299 Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
300 event.clone(),
301 &wrapper,
302 &request_body,
303 first_event_at,
304 )),
305 Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
306 event.clone(),
307 &wrapper,
308 &request_body,
309 first_event_at,
310 )),
311 Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
312 event.clone(),
313 &wrapper,
314 &request_body,
315 first_event_at,
316 )),
317 Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
318 event.clone(),
319 &wrapper,
320 &request_body,
321 first_event_at,
322 )),
323 Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
324 event.clone(),
325 &wrapper,
326 &request_body,
327 first_event_at,
328 )),
329 Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
330 event.clone(),
331 &wrapper,
332 &request_body,
333 first_event_at,
334 )),
335 }
336 }
337
338 to_upload
339 .upload(&clickhouse_client)
340 .await
341 .map_err(|err| Error::Internal(anyhow!(err)))?;
342
343 Ok(())
344}
345
346#[derive(Default)]
347struct ToUpload {
348 editor_events: Vec<EditorEventRow>,
349 copilot_events: Vec<CopilotEventRow>,
350 assistant_events: Vec<AssistantEventRow>,
351 call_events: Vec<CallEventRow>,
352 cpu_events: Vec<CpuEventRow>,
353 memory_events: Vec<MemoryEventRow>,
354 app_events: Vec<AppEventRow>,
355 setting_events: Vec<SettingEventRow>,
356 edit_events: Vec<EditEventRow>,
357 action_events: Vec<ActionEventRow>,
358}
359
360impl ToUpload {
361 pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
362 Self::upload_to_table("editor_events", &self.editor_events, clickhouse_client)
363 .await
364 .with_context(|| format!("failed to upload to table 'editor_events'"))?;
365 Self::upload_to_table("copilot_events", &self.copilot_events, clickhouse_client)
366 .await
367 .with_context(|| format!("failed to upload to table 'copilot_events'"))?;
368 Self::upload_to_table(
369 "assistant_events",
370 &self.assistant_events,
371 clickhouse_client,
372 )
373 .await
374 .with_context(|| format!("failed to upload to table 'assistant_events'"))?;
375 Self::upload_to_table("call_events", &self.call_events, clickhouse_client)
376 .await
377 .with_context(|| format!("failed to upload to table 'call_events'"))?;
378 Self::upload_to_table("cpu_events", &self.cpu_events, clickhouse_client)
379 .await
380 .with_context(|| format!("failed to upload to table 'cpu_events'"))?;
381 Self::upload_to_table("memory_events", &self.memory_events, clickhouse_client)
382 .await
383 .with_context(|| format!("failed to upload to table 'memory_events'"))?;
384 Self::upload_to_table("app_events", &self.app_events, clickhouse_client)
385 .await
386 .with_context(|| format!("failed to upload to table 'app_events'"))?;
387 Self::upload_to_table("setting_events", &self.setting_events, clickhouse_client)
388 .await
389 .with_context(|| format!("failed to upload to table 'setting_events'"))?;
390 Self::upload_to_table("edit_events", &self.edit_events, clickhouse_client)
391 .await
392 .with_context(|| format!("failed to upload to table 'edit_events'"))?;
393 Self::upload_to_table("action_events", &self.action_events, clickhouse_client)
394 .await
395 .with_context(|| format!("failed to upload to table 'action_events'"))?;
396 Ok(())
397 }
398
399 async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
400 table: &str,
401 rows: &[T],
402 clickhouse_client: &clickhouse::Client,
403 ) -> anyhow::Result<()> {
404 if !rows.is_empty() {
405 let mut insert = clickhouse_client.insert(table)?;
406
407 for event in rows {
408 insert.write(event).await?;
409 }
410
411 insert.end().await?;
412 }
413
414 Ok(())
415 }
416}
417
418pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
419where
420 S: Serializer,
421{
422 if country_code.len() != 2 {
423 use serde::ser::Error;
424 return Err(S::Error::custom(
425 "country_code must be exactly 2 characters",
426 ));
427 }
428
429 let country_code = country_code.as_bytes();
430
431 serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
432}
433
434#[derive(Serialize, Debug, clickhouse::Row)]
435pub struct EditorEventRow {
436 pub installation_id: String,
437 pub operation: String,
438 pub app_version: String,
439 pub file_extension: String,
440 pub os_name: String,
441 pub os_version: String,
442 pub release_channel: String,
443 pub signed_in: bool,
444 pub vim_mode: bool,
445 #[serde(serialize_with = "serialize_country_code")]
446 pub country_code: String,
447 pub region_code: String,
448 pub city: String,
449 pub time: i64,
450 pub copilot_enabled: bool,
451 pub copilot_enabled_for_language: bool,
452 pub historical_event: bool,
453 pub architecture: String,
454 pub is_staff: Option<bool>,
455 pub session_id: Option<String>,
456 pub major: Option<i32>,
457 pub minor: Option<i32>,
458 pub patch: Option<i32>,
459}
460
461impl EditorEventRow {
462 fn from_event(
463 event: EditorEvent,
464 wrapper: &EventWrapper,
465 body: &EventRequestBody,
466 first_event_at: chrono::DateTime<chrono::Utc>,
467 country_code: Option<String>,
468 ) -> Self {
469 let semver = body.semver();
470 let time =
471 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
472
473 Self {
474 app_version: body.app_version.clone(),
475 major: semver.map(|s| s.major as i32),
476 minor: semver.map(|s| s.minor as i32),
477 patch: semver.map(|s| s.patch as i32),
478 release_channel: body.release_channel.clone().unwrap_or_default(),
479 os_name: body.os_name.clone(),
480 os_version: body.os_version.clone().unwrap_or_default(),
481 architecture: body.architecture.clone(),
482 installation_id: body.installation_id.clone().unwrap_or_default(),
483 session_id: body.session_id.clone(),
484 is_staff: body.is_staff,
485 time: time.timestamp_millis(),
486 operation: event.operation,
487 file_extension: event.file_extension.unwrap_or_default(),
488 signed_in: wrapper.signed_in,
489 vim_mode: event.vim_mode,
490 copilot_enabled: event.copilot_enabled,
491 copilot_enabled_for_language: event.copilot_enabled_for_language,
492 country_code: country_code.unwrap_or("XX".to_string()),
493 region_code: "".to_string(),
494 city: "".to_string(),
495 historical_event: false,
496 }
497 }
498}
499
500#[derive(Serialize, Debug, clickhouse::Row)]
501pub struct CopilotEventRow {
502 pub installation_id: String,
503 pub suggestion_id: String,
504 pub suggestion_accepted: bool,
505 pub app_version: String,
506 pub file_extension: String,
507 pub os_name: String,
508 pub os_version: String,
509 pub release_channel: String,
510 pub signed_in: bool,
511 #[serde(serialize_with = "serialize_country_code")]
512 pub country_code: String,
513 pub region_code: String,
514 pub city: String,
515 pub time: i64,
516 pub is_staff: Option<bool>,
517 pub session_id: Option<String>,
518 pub major: Option<i32>,
519 pub minor: Option<i32>,
520 pub patch: Option<i32>,
521}
522
523impl CopilotEventRow {
524 fn from_event(
525 event: CopilotEvent,
526 wrapper: &EventWrapper,
527 body: &EventRequestBody,
528 first_event_at: chrono::DateTime<chrono::Utc>,
529 country_code: Option<String>,
530 ) -> Self {
531 let semver = body.semver();
532 let time =
533 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
534
535 Self {
536 app_version: body.app_version.clone(),
537 major: semver.map(|s| s.major as i32),
538 minor: semver.map(|s| s.minor as i32),
539 patch: semver.map(|s| s.patch as i32),
540 release_channel: body.release_channel.clone().unwrap_or_default(),
541 os_name: body.os_name.clone(),
542 os_version: body.os_version.clone().unwrap_or_default(),
543 installation_id: body.installation_id.clone().unwrap_or_default(),
544 session_id: body.session_id.clone(),
545 is_staff: body.is_staff,
546 time: time.timestamp_millis(),
547 file_extension: event.file_extension.unwrap_or_default(),
548 signed_in: wrapper.signed_in,
549 country_code: country_code.unwrap_or("XX".to_string()),
550 region_code: "".to_string(),
551 city: "".to_string(),
552 suggestion_id: event.suggestion_id.unwrap_or_default(),
553 suggestion_accepted: event.suggestion_accepted,
554 }
555 }
556}
557
558#[derive(Serialize, Debug, clickhouse::Row)]
559pub struct CallEventRow {
560 // AppInfoBase
561 app_version: String,
562 major: Option<i32>,
563 minor: Option<i32>,
564 patch: Option<i32>,
565 release_channel: String,
566
567 // ClientEventBase
568 installation_id: String,
569 session_id: Option<String>,
570 is_staff: Option<bool>,
571 time: i64,
572
573 // CallEventRow
574 operation: String,
575 room_id: Option<u64>,
576 channel_id: Option<u64>,
577}
578
579impl CallEventRow {
580 fn from_event(
581 event: CallEvent,
582 wrapper: &EventWrapper,
583 body: &EventRequestBody,
584 first_event_at: chrono::DateTime<chrono::Utc>,
585 ) -> Self {
586 let semver = body.semver();
587 let time =
588 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
589
590 Self {
591 app_version: body.app_version.clone(),
592 major: semver.map(|s| s.major as i32),
593 minor: semver.map(|s| s.minor as i32),
594 patch: semver.map(|s| s.patch as i32),
595 release_channel: body.release_channel.clone().unwrap_or_default(),
596 installation_id: body.installation_id.clone().unwrap_or_default(),
597 session_id: body.session_id.clone(),
598 is_staff: body.is_staff,
599 time: time.timestamp_millis(),
600 operation: event.operation,
601 room_id: event.room_id,
602 channel_id: event.channel_id,
603 }
604 }
605}
606
607#[derive(Serialize, Debug, clickhouse::Row)]
608pub struct AssistantEventRow {
609 // AppInfoBase
610 app_version: String,
611 major: Option<i32>,
612 minor: Option<i32>,
613 patch: Option<i32>,
614 release_channel: String,
615
616 // ClientEventBase
617 installation_id: Option<String>,
618 session_id: Option<String>,
619 is_staff: Option<bool>,
620 time: i64,
621
622 // AssistantEventRow
623 conversation_id: String,
624 kind: String,
625 model: String,
626}
627
628impl AssistantEventRow {
629 fn from_event(
630 event: AssistantEvent,
631 wrapper: &EventWrapper,
632 body: &EventRequestBody,
633 first_event_at: chrono::DateTime<chrono::Utc>,
634 ) -> Self {
635 let semver = body.semver();
636 let time =
637 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
638
639 Self {
640 app_version: body.app_version.clone(),
641 major: semver.map(|s| s.major as i32),
642 minor: semver.map(|s| s.minor as i32),
643 patch: semver.map(|s| s.patch as i32),
644 release_channel: body.release_channel.clone().unwrap_or_default(),
645 installation_id: body.installation_id.clone(),
646 session_id: body.session_id.clone(),
647 is_staff: body.is_staff,
648 time: time.timestamp_millis(),
649 conversation_id: event.conversation_id.unwrap_or_default(),
650 kind: event.kind.to_string(),
651 model: event.model,
652 }
653 }
654}
655
656#[derive(Debug, clickhouse::Row, Serialize)]
657pub struct CpuEventRow {
658 pub installation_id: Option<String>,
659 pub is_staff: Option<bool>,
660 pub usage_as_percentage: f32,
661 pub core_count: u32,
662 pub app_version: String,
663 pub release_channel: String,
664 pub time: i64,
665 pub session_id: Option<String>,
666 // pub normalized_cpu_usage: f64, MATERIALIZED
667 pub major: Option<i32>,
668 pub minor: Option<i32>,
669 pub patch: Option<i32>,
670}
671
672impl CpuEventRow {
673 fn from_event(
674 event: CpuEvent,
675 wrapper: &EventWrapper,
676 body: &EventRequestBody,
677 first_event_at: chrono::DateTime<chrono::Utc>,
678 ) -> Self {
679 let semver = body.semver();
680 let time =
681 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
682
683 Self {
684 app_version: body.app_version.clone(),
685 major: semver.map(|s| s.major as i32),
686 minor: semver.map(|s| s.minor as i32),
687 patch: semver.map(|s| s.patch as i32),
688 release_channel: body.release_channel.clone().unwrap_or_default(),
689 installation_id: body.installation_id.clone(),
690 session_id: body.session_id.clone(),
691 is_staff: body.is_staff,
692 time: time.timestamp_millis(),
693 usage_as_percentage: event.usage_as_percentage,
694 core_count: event.core_count,
695 }
696 }
697}
698
699#[derive(Serialize, Debug, clickhouse::Row)]
700pub struct MemoryEventRow {
701 // AppInfoBase
702 app_version: String,
703 major: Option<i32>,
704 minor: Option<i32>,
705 patch: Option<i32>,
706 release_channel: String,
707
708 // ClientEventBase
709 installation_id: Option<String>,
710 session_id: Option<String>,
711 is_staff: Option<bool>,
712 time: i64,
713
714 // MemoryEventRow
715 memory_in_bytes: u64,
716 virtual_memory_in_bytes: u64,
717}
718
719impl MemoryEventRow {
720 fn from_event(
721 event: MemoryEvent,
722 wrapper: &EventWrapper,
723 body: &EventRequestBody,
724 first_event_at: chrono::DateTime<chrono::Utc>,
725 ) -> Self {
726 let semver = body.semver();
727 let time =
728 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
729
730 Self {
731 app_version: body.app_version.clone(),
732 major: semver.map(|s| s.major as i32),
733 minor: semver.map(|s| s.minor as i32),
734 patch: semver.map(|s| s.patch as i32),
735 release_channel: body.release_channel.clone().unwrap_or_default(),
736 installation_id: body.installation_id.clone(),
737 session_id: body.session_id.clone(),
738 is_staff: body.is_staff,
739 time: time.timestamp_millis(),
740 memory_in_bytes: event.memory_in_bytes,
741 virtual_memory_in_bytes: event.virtual_memory_in_bytes,
742 }
743 }
744}
745
746#[derive(Serialize, Debug, clickhouse::Row)]
747pub struct AppEventRow {
748 // AppInfoBase
749 app_version: String,
750 major: Option<i32>,
751 minor: Option<i32>,
752 patch: Option<i32>,
753 release_channel: String,
754
755 // ClientEventBase
756 installation_id: Option<String>,
757 session_id: Option<String>,
758 is_staff: Option<bool>,
759 time: i64,
760
761 // AppEventRow
762 operation: String,
763}
764
765impl AppEventRow {
766 fn from_event(
767 event: AppEvent,
768 wrapper: &EventWrapper,
769 body: &EventRequestBody,
770 first_event_at: chrono::DateTime<chrono::Utc>,
771 ) -> Self {
772 let semver = body.semver();
773 let time =
774 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
775
776 Self {
777 app_version: body.app_version.clone(),
778 major: semver.map(|s| s.major as i32),
779 minor: semver.map(|s| s.minor as i32),
780 patch: semver.map(|s| s.patch as i32),
781 release_channel: body.release_channel.clone().unwrap_or_default(),
782 installation_id: body.installation_id.clone(),
783 session_id: body.session_id.clone(),
784 is_staff: body.is_staff,
785 time: time.timestamp_millis(),
786 operation: event.operation,
787 }
788 }
789}
790
791#[derive(Serialize, Debug, clickhouse::Row)]
792pub struct SettingEventRow {
793 // AppInfoBase
794 app_version: String,
795 major: Option<i32>,
796 minor: Option<i32>,
797 patch: Option<i32>,
798 release_channel: String,
799
800 // ClientEventBase
801 installation_id: Option<String>,
802 session_id: Option<String>,
803 is_staff: Option<bool>,
804 time: i64,
805 // SettingEventRow
806 setting: String,
807 value: String,
808}
809
810impl SettingEventRow {
811 fn from_event(
812 event: SettingEvent,
813 wrapper: &EventWrapper,
814 body: &EventRequestBody,
815 first_event_at: chrono::DateTime<chrono::Utc>,
816 ) -> Self {
817 let semver = body.semver();
818 let time =
819 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
820
821 Self {
822 app_version: body.app_version.clone(),
823 major: semver.map(|s| s.major as i32),
824 minor: semver.map(|s| s.minor as i32),
825 patch: semver.map(|s| s.patch as i32),
826 release_channel: body.release_channel.clone().unwrap_or_default(),
827 installation_id: body.installation_id.clone(),
828 session_id: body.session_id.clone(),
829 is_staff: body.is_staff,
830 time: time.timestamp_millis(),
831 setting: event.setting,
832 value: event.value,
833 }
834 }
835}
836
837#[derive(Serialize, Debug, clickhouse::Row)]
838pub struct EditEventRow {
839 // AppInfoBase
840 app_version: String,
841 major: Option<i32>,
842 minor: Option<i32>,
843 patch: Option<i32>,
844 release_channel: String,
845
846 // ClientEventBase
847 installation_id: Option<String>,
848 // Note: This column name has a typo in the ClickHouse table.
849 #[serde(rename = "sesssion_id")]
850 session_id: Option<String>,
851 is_staff: Option<bool>,
852 time: i64,
853
854 // EditEventRow
855 period_start: i64,
856 period_end: i64,
857 environment: String,
858}
859
860impl EditEventRow {
861 fn from_event(
862 event: EditEvent,
863 wrapper: &EventWrapper,
864 body: &EventRequestBody,
865 first_event_at: chrono::DateTime<chrono::Utc>,
866 ) -> Self {
867 let semver = body.semver();
868 let time =
869 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
870
871 let period_start = time - chrono::Duration::milliseconds(event.duration);
872 let period_end = time;
873
874 Self {
875 app_version: body.app_version.clone(),
876 major: semver.map(|s| s.major as i32),
877 minor: semver.map(|s| s.minor as i32),
878 patch: semver.map(|s| s.patch as i32),
879 release_channel: body.release_channel.clone().unwrap_or_default(),
880 installation_id: body.installation_id.clone(),
881 session_id: body.session_id.clone(),
882 is_staff: body.is_staff,
883 time: time.timestamp_millis(),
884 period_start: period_start.timestamp_millis(),
885 period_end: period_end.timestamp_millis(),
886 environment: event.environment,
887 }
888 }
889}
890
891#[derive(Serialize, Debug, clickhouse::Row)]
892pub struct ActionEventRow {
893 // AppInfoBase
894 app_version: String,
895 major: Option<i32>,
896 minor: Option<i32>,
897 patch: Option<i32>,
898 release_channel: String,
899
900 // ClientEventBase
901 installation_id: Option<String>,
902 // Note: This column name has a typo in the ClickHouse table.
903 #[serde(rename = "sesssion_id")]
904 session_id: Option<String>,
905 is_staff: Option<bool>,
906 time: i64,
907 // ActionEventRow
908 source: String,
909 action: String,
910}
911
912impl ActionEventRow {
913 fn from_event(
914 event: ActionEvent,
915 wrapper: &EventWrapper,
916 body: &EventRequestBody,
917 first_event_at: chrono::DateTime<chrono::Utc>,
918 ) -> Self {
919 let semver = body.semver();
920 let time =
921 first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
922
923 Self {
924 app_version: body.app_version.clone(),
925 major: semver.map(|s| s.major as i32),
926 minor: semver.map(|s| s.minor as i32),
927 patch: semver.map(|s| s.patch as i32),
928 release_channel: body.release_channel.clone().unwrap_or_default(),
929 installation_id: body.installation_id.clone(),
930 session_id: body.session_id.clone(),
931 is_staff: body.is_staff,
932 time: time.timestamp_millis(),
933 source: event.source,
934 action: event.action,
935 }
936 }
937}