telemetry.rs

  1use crate::{http::HttpClient, ZED_SECRET_CLIENT_TOKEN};
  2use gpui::{
  3    executor::Background,
  4    serde_json::{self, value::Map, Value},
  5    AppContext, Task,
  6};
  7use isahc::Request;
  8use parking_lot::Mutex;
  9use serde::Serialize;
 10use std::{
 11    mem,
 12    sync::Arc,
 13    time::{Duration, SystemTime, UNIX_EPOCH},
 14};
 15use util::{post_inc, ResultExt};
 16
 17pub struct Telemetry {
 18    client: Arc<dyn HttpClient>,
 19    executor: Arc<Background>,
 20    session_id: u128,
 21    state: Mutex<TelemetryState>,
 22}
 23
 24#[derive(Default)]
 25struct TelemetryState {
 26    user_id: Option<Arc<str>>,
 27    device_id: Option<Arc<str>>,
 28    app_version: Option<Arc<str>>,
 29    os_version: Option<Arc<str>>,
 30    os_name: &'static str,
 31    queue: Vec<AmplitudeEvent>,
 32    next_event_id: usize,
 33    flush_task: Option<Task<()>>,
 34}
 35
 36const AMPLITUDE_EVENTS_URL: &'static str = "https//api2.amplitude.com/batch";
 37
 38#[derive(Serialize)]
 39struct AmplitudeEventBatch {
 40    api_key: &'static str,
 41    events: Vec<AmplitudeEvent>,
 42}
 43
 44#[derive(Serialize)]
 45struct AmplitudeEvent {
 46    user_id: Option<Arc<str>>,
 47    device_id: Option<Arc<str>>,
 48    event_type: String,
 49    event_properties: Option<Map<String, Value>>,
 50    user_properties: Option<Map<String, Value>>,
 51    os_name: &'static str,
 52    os_version: Option<Arc<str>>,
 53    app_version: Option<Arc<str>>,
 54    event_id: usize,
 55    session_id: u128,
 56    time: u128,
 57}
 58
 59#[cfg(debug_assertions)]
 60const MAX_QUEUE_LEN: usize = 1;
 61
 62#[cfg(not(debug_assertions))]
 63const MAX_QUEUE_LEN: usize = 10;
 64
 65const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
 66
 67impl Telemetry {
 68    pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
 69        let platform = cx.platform();
 70        Arc::new(Self {
 71            client,
 72            executor: cx.background().clone(),
 73            session_id: SystemTime::now()
 74                .duration_since(UNIX_EPOCH)
 75                .unwrap()
 76                .as_millis(),
 77            state: Mutex::new(TelemetryState {
 78                os_version: platform
 79                    .os_version()
 80                    .log_err()
 81                    .map(|v| v.to_string().into()),
 82                os_name: platform.os_name().into(),
 83                app_version: platform
 84                    .app_version()
 85                    .log_err()
 86                    .map(|v| v.to_string().into()),
 87                device_id: None,
 88                queue: Default::default(),
 89                flush_task: Default::default(),
 90                next_event_id: 0,
 91                user_id: None,
 92            }),
 93        })
 94    }
 95
 96    pub fn log_event(self: &Arc<Self>, kind: &str, properties: Value) {
 97        let mut state = self.state.lock();
 98        let event = AmplitudeEvent {
 99            event_type: kind.to_string(),
100            time: SystemTime::now()
101                .duration_since(UNIX_EPOCH)
102                .unwrap()
103                .as_millis(),
104            session_id: self.session_id,
105            event_properties: if let Value::Object(properties) = properties {
106                Some(properties)
107            } else {
108                None
109            },
110            user_properties: None,
111            user_id: state.user_id.clone(),
112            device_id: state.device_id.clone(),
113            os_name: state.os_name,
114            os_version: state.os_version.clone(),
115            app_version: state.app_version.clone(),
116            event_id: post_inc(&mut state.next_event_id),
117        };
118        state.queue.push(event);
119        if state.queue.len() >= MAX_QUEUE_LEN {
120            drop(state);
121            self.flush();
122        } else {
123            let this = self.clone();
124            let executor = self.executor.clone();
125            state.flush_task = Some(self.executor.spawn(async move {
126                executor.timer(DEBOUNCE_INTERVAL).await;
127                this.flush();
128            }));
129        }
130    }
131
132    fn flush(&self) {
133        let mut state = self.state.lock();
134        let events = mem::take(&mut state.queue);
135        let client = self.client.clone();
136        state.flush_task.take();
137        self.executor
138            .spawn(async move {
139                let body = serde_json::to_vec(&AmplitudeEventBatch {
140                    api_key: ZED_SECRET_CLIENT_TOKEN,
141                    events,
142                })
143                .log_err()?;
144                let request = Request::post(AMPLITUDE_EVENTS_URL)
145                    .header("Content-Type", "application/json")
146                    .body(body.into())
147                    .log_err()?;
148                client.send(request).await.log_err();
149                Some(())
150            })
151            .detach();
152    }
153}