events.rs

  1use crate::api::CloudflareIpCountryHeader;
  2use crate::{AppState, Error, Result};
  3use anyhow::anyhow;
  4use axum::{
  5    Extension, Router, TypedHeader,
  6    body::Bytes,
  7    headers::Header,
  8    http::{HeaderName, StatusCode},
  9    routing::post,
 10};
 11use chrono::Duration;
 12use serde::{Deserialize, Serialize};
 13use serde_json::json;
 14use sha2::{Digest, Sha256};
 15use std::sync::{Arc, OnceLock};
 16use telemetry_events::{Event, EventRequestBody};
 17use util::ResultExt;
 18use uuid::Uuid;
 19
 20pub fn router() -> Router {
 21    Router::new()
 22        .route("/telemetry/events", post(post_events))
 23        .route("/telemetry/crashes", post(post_panic))
 24        .route("/telemetry/panics", post(post_panic))
 25        .route("/telemetry/hangs", post(post_panic))
 26}
 27
 28pub struct ZedChecksumHeader(Vec<u8>);
 29
 30impl Header for ZedChecksumHeader {
 31    fn name() -> &'static HeaderName {
 32        static ZED_CHECKSUM_HEADER: OnceLock<HeaderName> = OnceLock::new();
 33        ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum"))
 34    }
 35
 36    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
 37    where
 38        Self: Sized,
 39        I: Iterator<Item = &'i axum::http::HeaderValue>,
 40    {
 41        let checksum = values
 42            .next()
 43            .ok_or_else(axum::headers::Error::invalid)?
 44            .to_str()
 45            .map_err(|_| axum::headers::Error::invalid())?;
 46
 47        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
 48        Ok(Self(bytes))
 49    }
 50
 51    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
 52        unimplemented!()
 53    }
 54}
 55
 56pub async fn post_panic() -> Result<()> {
 57    // as of v0.201.x crash/panic reporting is now done via Sentry.
 58    // The endpoint returns OK to avoid spurious errors for old clients.
 59    Ok(())
 60}
 61
 62pub async fn post_events(
 63    Extension(app): Extension<Arc<AppState>>,
 64    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
 65    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
 66    body: Bytes,
 67) -> Result<()> {
 68    let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
 69        return Err(Error::http(
 70            StatusCode::INTERNAL_SERVER_ERROR,
 71            "events not enabled".into(),
 72        ))?;
 73    };
 74
 75    let checksum_matched = checksum == expected;
 76
 77    let request_body: telemetry_events::EventRequestBody =
 78        serde_json::from_slice(&body).map_err(|err| {
 79            log::error!("can't parse event json: {err}");
 80            Error::Internal(anyhow!(err))
 81        })?;
 82
 83    let Some(last_event) = request_body.events.last() else {
 84        return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?;
 85    };
 86    let country_code = country_code_header.map(|h| h.to_string());
 87
 88    let first_event_at = chrono::Utc::now()
 89        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 90
 91    if let Some(kinesis_client) = app.kinesis_client.clone()
 92        && let Some(stream) = app.config.kinesis_stream.clone()
 93    {
 94        let mut request = kinesis_client.put_records().stream_name(stream);
 95        let mut has_records = false;
 96        for row in for_snowflake(
 97            request_body.clone(),
 98            first_event_at,
 99            country_code.clone(),
100            checksum_matched,
101        ) {
102            if let Some(data) = serde_json::to_vec(&row).log_err() {
103                request = request.records(
104                    aws_sdk_kinesis::types::PutRecordsRequestEntry::builder()
105                        .partition_key(request_body.system_id.clone().unwrap_or_default())
106                        .data(data.into())
107                        .build()
108                        .unwrap(),
109                );
110                has_records = true;
111            }
112        }
113        if has_records {
114            request.send().await.log_err();
115        }
116    };
117
118    Ok(())
119}
120
121pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> Option<Vec<u8>> {
122    let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?;
123
124    let mut summer = Sha256::new();
125    summer.update(checksum_seed);
126    summer.update(json);
127    summer.update(checksum_seed);
128    Some(summer.finalize().into_iter().collect())
129}
130
131fn for_snowflake(
132    body: EventRequestBody,
133    first_event_at: chrono::DateTime<chrono::Utc>,
134    country_code: Option<String>,
135    checksum_matched: bool,
136) -> impl Iterator<Item = SnowflakeRow> {
137    body.events.into_iter().map(move |event| {
138        let timestamp =
139            first_event_at + Duration::milliseconds(event.milliseconds_since_first_event);
140        let (event_type, mut event_properties) = match &event.event {
141            Event::Flexible(e) => (
142                e.event_type.clone(),
143                serde_json::to_value(&e.event_properties).unwrap(),
144            ),
145        };
146
147        if let serde_json::Value::Object(ref mut map) = event_properties {
148            map.insert("app_version".to_string(), body.app_version.clone().into());
149            map.insert("os_name".to_string(), body.os_name.clone().into());
150            map.insert("os_version".to_string(), body.os_version.clone().into());
151            map.insert("architecture".to_string(), body.architecture.clone().into());
152            map.insert(
153                "release_channel".to_string(),
154                body.release_channel.clone().into(),
155            );
156            map.insert("signed_in".to_string(), event.signed_in.into());
157            map.insert("checksum_matched".to_string(), checksum_matched.into());
158            if let Some(country_code) = country_code.as_ref() {
159                map.insert("country".to_string(), country_code.clone().into());
160            }
161        }
162
163        // NOTE: most amplitude user properties are read out of our event_properties
164        // dictionary. See https://app.amplitude.com/data/zed/Zed/sources/detail/production/falcon%3A159998
165        // for how that is configured.
166        let user_properties = body.is_staff.map(|is_staff| {
167            serde_json::json!({
168                "is_staff": is_staff,
169            })
170        });
171
172        SnowflakeRow {
173            time: timestamp,
174            user_id: body.metrics_id.clone(),
175            device_id: body.system_id.clone(),
176            event_type,
177            event_properties,
178            user_properties,
179            insert_id: Some(Uuid::new_v4().to_string()),
180        }
181    })
182}
183
184#[derive(Serialize, Deserialize, Debug)]
185pub struct SnowflakeRow {
186    pub time: chrono::DateTime<chrono::Utc>,
187    pub user_id: Option<String>,
188    pub device_id: Option<String>,
189    pub event_type: String,
190    pub event_properties: serde_json::Value,
191    pub user_properties: Option<serde_json::Value>,
192    pub insert_id: Option<String>,
193}
194
195impl SnowflakeRow {
196    pub fn new(
197        event_type: impl Into<String>,
198        metrics_id: Option<Uuid>,
199        is_staff: bool,
200        system_id: Option<String>,
201        event_properties: serde_json::Value,
202    ) -> Self {
203        Self {
204            time: chrono::Utc::now(),
205            event_type: event_type.into(),
206            device_id: system_id,
207            user_id: metrics_id.map(|id| id.to_string()),
208            insert_id: Some(uuid::Uuid::new_v4().to_string()),
209            event_properties,
210            user_properties: Some(json!({"is_staff": is_staff})),
211        }
212    }
213
214    pub async fn write(
215        self,
216        client: &Option<aws_sdk_kinesis::Client>,
217        stream: &Option<String>,
218    ) -> anyhow::Result<()> {
219        let Some((client, stream)) = client.as_ref().zip(stream.as_ref()) else {
220            return Ok(());
221        };
222        let row = serde_json::to_vec(&self)?;
223        client
224            .put_record()
225            .stream_name(stream)
226            .partition_key(&self.user_id.unwrap_or_default())
227            .data(row.into())
228            .send()
229            .await?;
230        Ok(())
231    }
232}