telemetry.rs

  1use crate::http::HttpClient;
  2use db::Db;
  3use gpui::{
  4    executor::Background,
  5    serde_json::{self, value::Map, Value},
  6    AppContext, Task,
  7};
  8use isahc::Request;
  9use lazy_static::lazy_static;
 10use parking_lot::Mutex;
 11use serde::Serialize;
 12use std::{
 13    mem,
 14    sync::Arc,
 15    time::{Duration, SystemTime, UNIX_EPOCH},
 16};
 17use util::{post_inc, ResultExt, TryFutureExt};
 18use uuid::Uuid;
 19
 20pub struct Telemetry {
 21    client: Arc<dyn HttpClient>,
 22    executor: Arc<Background>,
 23    session_id: u128,
 24    state: Mutex<TelemetryState>,
 25}
 26
 27#[derive(Default)]
 28struct TelemetryState {
 29    user_id: Option<Arc<str>>,
 30    device_id: Option<Arc<str>>,
 31    app_version: Option<Arc<str>>,
 32    os_version: Option<Arc<str>>,
 33    os_name: &'static str,
 34    queue: Vec<AmplitudeEvent>,
 35    next_event_id: usize,
 36    flush_task: Option<Task<()>>,
 37}
 38
 39const AMPLITUDE_EVENTS_URL: &'static str = "https://api2.amplitude.com/batch";
 40
 41lazy_static! {
 42    static ref AMPLITUDE_API_KEY: Option<String> = std::env::var("ZED_AMPLITUDE_API_KEY")
 43        .ok()
 44        .or_else(|| option_env!("ZED_AMPLITUDE_API_KEY").map(|key| key.to_string()));
 45}
 46
 47#[derive(Serialize)]
 48struct AmplitudeEventBatch {
 49    api_key: &'static str,
 50    events: Vec<AmplitudeEvent>,
 51}
 52
 53#[derive(Serialize)]
 54struct AmplitudeEvent {
 55    user_id: Option<Arc<str>>,
 56    device_id: Option<Arc<str>>,
 57    event_type: String,
 58    event_properties: Option<Map<String, Value>>,
 59    user_properties: Option<Map<String, Value>>,
 60    os_name: &'static str,
 61    os_version: Option<Arc<str>>,
 62    app_version: Option<Arc<str>>,
 63    event_id: usize,
 64    session_id: u128,
 65    time: u128,
 66}
 67
 68#[cfg(debug_assertions)]
 69const MAX_QUEUE_LEN: usize = 1;
 70
 71#[cfg(not(debug_assertions))]
 72const MAX_QUEUE_LEN: usize = 10;
 73
 74#[cfg(debug_assertions)]
 75const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
 76
 77#[cfg(not(debug_assertions))]
 78const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
 79
 80impl Telemetry {
 81    pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
 82        let platform = cx.platform();
 83        Arc::new(Self {
 84            client,
 85            executor: cx.background().clone(),
 86            session_id: SystemTime::now()
 87                .duration_since(UNIX_EPOCH)
 88                .unwrap()
 89                .as_millis(),
 90            state: Mutex::new(TelemetryState {
 91                os_version: platform
 92                    .os_version()
 93                    .log_err()
 94                    .map(|v| v.to_string().into()),
 95                os_name: platform.os_name().into(),
 96                app_version: platform
 97                    .app_version()
 98                    .log_err()
 99                    .map(|v| v.to_string().into()),
100                device_id: None,
101                queue: Default::default(),
102                flush_task: Default::default(),
103                next_event_id: 0,
104                user_id: None,
105            }),
106        })
107    }
108
109    pub fn start(self: &Arc<Self>, db: Arc<Db>) {
110        let this = self.clone();
111        self.executor
112            .spawn(
113                async move {
114                    let device_id = if let Some(device_id) = db
115                        .read(["device_id"])?
116                        .into_iter()
117                        .flatten()
118                        .next()
119                        .and_then(|bytes| String::from_utf8(bytes).ok())
120                    {
121                        device_id
122                    } else {
123                        let device_id = Uuid::new_v4().to_string();
124                        db.write([("device_id", device_id.as_bytes())])?;
125                        device_id
126                    };
127
128                    let device_id = Some(Arc::from(device_id));
129                    let mut state = this.state.lock();
130                    state.device_id = device_id.clone();
131                    for event in &mut state.queue {
132                        event.device_id = device_id.clone();
133                    }
134                    if !state.queue.is_empty() {
135                        drop(state);
136                        this.flush();
137                    }
138
139                    anyhow::Ok(())
140                }
141                .log_err(),
142            )
143            .detach();
144    }
145
146    pub fn set_user_id(&self, user_id: Option<u64>) {
147        self.state.lock().user_id = user_id.map(|id| id.to_string().into());
148    }
149
150    pub fn report_event(self: &Arc<Self>, kind: &str, properties: Value) {
151        if AMPLITUDE_API_KEY.is_none() {
152            return;
153        }
154
155        let mut state = self.state.lock();
156        let event = AmplitudeEvent {
157            event_type: kind.to_string(),
158            time: SystemTime::now()
159                .duration_since(UNIX_EPOCH)
160                .unwrap()
161                .as_millis(),
162            session_id: self.session_id,
163            event_properties: if let Value::Object(properties) = properties {
164                Some(properties)
165            } else {
166                None
167            },
168            user_properties: None,
169            user_id: state.user_id.clone(),
170            device_id: state.device_id.clone(),
171            os_name: state.os_name,
172            os_version: state.os_version.clone(),
173            app_version: state.app_version.clone(),
174            event_id: post_inc(&mut state.next_event_id),
175        };
176        state.queue.push(event);
177        if state.device_id.is_some() {
178            if state.queue.len() >= MAX_QUEUE_LEN {
179                drop(state);
180                self.flush();
181            } else {
182                let this = self.clone();
183                let executor = self.executor.clone();
184                state.flush_task = Some(self.executor.spawn(async move {
185                    executor.timer(DEBOUNCE_INTERVAL).await;
186                    this.flush();
187                }));
188            }
189        }
190    }
191
192    fn flush(&self) {
193        let mut state = self.state.lock();
194        let events = mem::take(&mut state.queue);
195        state.flush_task.take();
196
197        if let Some(api_key) = AMPLITUDE_API_KEY.as_ref() {
198            let client = self.client.clone();
199            self.executor
200                .spawn(async move {
201                    let batch = AmplitudeEventBatch { api_key, events };
202                    let body = serde_json::to_vec(&batch).log_err()?;
203                    let request = Request::post(AMPLITUDE_EVENTS_URL)
204                        .body(body.into())
205                        .log_err()?;
206                    client.send(request).await.log_err();
207                    Some(())
208                })
209                .detach();
210        }
211    }
212}