1use crate::api::CloudflareIpCountryHeader;
2use crate::{AppState, Error, Result};
3use anyhow::anyhow;
4use axum::{
5 Extension, Router, TypedHeader,
6 body::Bytes,
7 headers::Header,
8 http::{HeaderName, StatusCode},
9 routing::post,
10};
11use chrono::Duration;
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use sha2::{Digest, Sha256};
15use std::sync::{Arc, OnceLock};
16use telemetry_events::{Event, EventRequestBody};
17use util::ResultExt;
18use uuid::Uuid;
19
20pub fn router() -> Router {
21 Router::new()
22 .route("/telemetry/events", post(post_events))
23 .route("/telemetry/crashes", post(post_panic))
24 .route("/telemetry/panics", post(post_panic))
25 .route("/telemetry/hangs", post(post_panic))
26}
27
28pub struct ZedChecksumHeader(Vec<u8>);
29
30impl Header for ZedChecksumHeader {
31 fn name() -> &'static HeaderName {
32 static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
33 ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
34 }
35
36 fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
37 where
38 Self: Sized,
39 I: Iterator<Item = &'i axum::http::HeaderValue>,
40 {
41 let checksum = values
42 .next()
43 .ok_or_else(axum::headers::Error::invalid)?
44 .to_str()
45 .map_err(|_| axum::headers::Error::invalid())?;
46
47 let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
48 Ok(Self(bytes))
49 }
50
51 fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
52 unimplemented!()
53 }
54}
55
56pub async fn post_panic() -> Result<()> {
57 // as of v0.201.x crash/panic reporting is now done via Sentry.
58 // The endpoint returns OK to avoid spurious errors for old clients.
59 Ok(())
60}
61
62pub async fn post_events(
63 Extension(app): Extension<Arc<AppState>>,
64 TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
65 country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
66 body: Bytes,
67) -> Result<()> {
68 let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
69 return Err(Error::http(
70 StatusCode::INTERNAL_SERVER_ERROR,
71 "events not enabled".into(),
72 ))?;
73 };
74
75 let checksum_matched = checksum == expected;
76
77 let request_body: telemetry_events::EventRequestBody =
78 serde_json::from_slice(&body).map_err(|err| {
79 log::error!("can't parse event json: {err}");
80 Error::Internal(anyhow!(err))
81 })?;
82
83 let Some(last_event) = request_body.events.last() else {
84 return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
85 };
86 let country_code = country_code_header.map(|h| h.to_string());
87
88 let first_event_at = chrono::Utc::now()
89 - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
90
91 if let Some(kinesis_client) = app.kinesis_client.clone()
92 && let Some(stream) = app.config.kinesis_stream.clone()
93 {
94 let mut request = kinesis_client.put_records().stream_name(stream);
95 let mut has_records = false;
96 for row in for_snowflake(
97 request_body.clone(),
98 first_event_at,
99 country_code.clone(),
100 checksum_matched,
101 ) {
102 if let Some(data) = serde_json::to_vec(&row).log_err() {
103 request = request.records(
104 aws_sdk_kinesis::types::PutRecordsRequestEntry::builder()
105 .partition_key(request_body.system_id.clone().unwrap_or_default())
106 .data(data.into())
107 .build()
108 .unwrap(),
109 );
110 has_records = true;
111 }
112 }
113 if has_records {
114 request.send().await.log_err();
115 }
116 };
117
118 Ok(())
119}
120
121pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
122 let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
123
124 let mut summer = Sha256::new();
125 summer.update(checksum_seed);
126 summer.update(json);
127 summer.update(checksum_seed);
128 Some(summer.finalize().into_iter().collect())
129}
130
131fn for_snowflake(
132 body: EventRequestBody,
133 first_event_at: chrono::DateTime<chrono::Utc>,
134 country_code: Option<String>,
135 checksum_matched: bool,
136) -> impl Iterator<Item = SnowflakeRow> {
137 body.events.into_iter().map(move |event| {
138 let timestamp =
139 first_event_at + Duration::milliseconds(event.milliseconds_since_first_event);
140 let (event_type, mut event_properties) = match &event.event {
141 Event::Flexible(e) => (
142 e.event_type.clone(),
143 serde_json::to_value(&e.event_properties).unwrap(),
144 ),
145 };
146
147 if let serde_json::Value::Object(ref mut map) = event_properties {
148 map.insert("app_version".to_string(), body.app_version.clone().into());
149 map.insert("os_name".to_string(), body.os_name.clone().into());
150 map.insert("os_version".to_string(), body.os_version.clone().into());
151 map.insert("architecture".to_string(), body.architecture.clone().into());
152 map.insert(
153 "release_channel".to_string(),
154 body.release_channel.clone().into(),
155 );
156 map.insert("signed_in".to_string(), event.signed_in.into());
157 map.insert("checksum_matched".to_string(), checksum_matched.into());
158 if let Some(country_code) = country_code.as_ref() {
159 map.insert("country".to_string(), country_code.clone().into());
160 }
161 }
162
163 // NOTE: most amplitude user properties are read out of our event_properties
164 // dictionary. See https://app.amplitude.com/data/zed/Zed/sources/detail/production/falcon%3A159998
165 // for how that is configured.
166 let user_properties = body.is_staff.map(|is_staff| {
167 serde_json::json!({
168 "is_staff": is_staff,
169 })
170 });
171
172 SnowflakeRow {
173 time: timestamp,
174 user_id: body.metrics_id.clone(),
175 device_id: body.system_id.clone(),
176 event_type,
177 event_properties,
178 user_properties,
179 insert_id: Some(Uuid::new_v4().to_string()),
180 }
181 })
182}
183
184#[derive(Serialize, Deserialize, Debug)]
185pub struct SnowflakeRow {
186 pub time: chrono::DateTime<chrono::Utc>,
187 pub user_id: Option<String>,
188 pub device_id: Option<String>,
189 pub event_type: String,
190 pub event_properties: serde_json::Value,
191 pub user_properties: Option<serde_json::Value>,
192 pub insert_id: Option<String>,
193}
194
195impl SnowflakeRow {
196 pub fn new(
197 event_type: impl Into<String>,
198 metrics_id: Option<Uuid>,
199 is_staff: bool,
200 system_id: Option<String>,
201 event_properties: serde_json::Value,
202 ) -> Self {
203 Self {
204 time: chrono::Utc::now(),
205 event_type: event_type.into(),
206 device_id: system_id,
207 user_id: metrics_id.map(|id| id.to_string()),
208 insert_id: Some(uuid::Uuid::new_v4().to_string()),
209 event_properties,
210 user_properties: Some(json!({"is_staff": is_staff})),
211 }
212 }
213
214 pub async fn write(
215 self,
216 client: &Option<aws_sdk_kinesis::Client>,
217 stream: &Option<String>,
218 ) -> anyhow::Result<()> {
219 let Some((client, stream)) = client.as_ref().zip(stream.as_ref()) else {
220 return Ok(());
221 };
222 let row = serde_json::to_vec(&self)?;
223 client
224 .put_record()
225 .stream_name(stream)
226 .partition_key(&self.user_id.unwrap_or_default())
227 .data(row.into())
228 .send()
229 .await?;
230 Ok(())
231 }
232}