telemetry.rs

  1use crate::http::HttpClient;
  2use db::Db;
  3use gpui::{
  4    executor::Background,
  5    serde_json::{self, value::Map, Value},
  6    AppContext, Task,
  7};
  8use isahc::Request;
  9use lazy_static::lazy_static;
 10use parking_lot::Mutex;
 11use serde::Serialize;
 12use std::{
 13    io::Write,
 14    mem,
 15    path::PathBuf,
 16    sync::Arc,
 17    time::{Duration, SystemTime, UNIX_EPOCH},
 18};
 19use tempfile::NamedTempFile;
 20use util::{post_inc, ResultExt, TryFutureExt};
 21use uuid::Uuid;
 22
 23pub struct Telemetry {
 24    http_client: Arc<dyn HttpClient>,
 25    executor: Arc<Background>,
 26    session_id: u128,
 27    state: Mutex<TelemetryState>,
 28}
 29
 30#[derive(Default)]
 31struct TelemetryState {
 32    user_id: Option<Arc<str>>,
 33    device_id: Option<Arc<str>>,
 34    app_version: Option<Arc<str>>,
 35    os_version: Option<Arc<str>>,
 36    os_name: &'static str,
 37    queue: Vec<AmplitudeEvent>,
 38    next_event_id: usize,
 39    flush_task: Option<Task<()>>,
 40    log_file: Option<NamedTempFile>,
 41}
 42
 43const AMPLITUDE_EVENTS_URL: &'static str = "https://api2.amplitude.com/batch";
 44
 45lazy_static! {
 46    static ref AMPLITUDE_API_KEY: Option<String> = std::env::var("ZED_AMPLITUDE_API_KEY")
 47        .ok()
 48        .or_else(|| option_env!("ZED_AMPLITUDE_API_KEY").map(|key| key.to_string()));
 49}
 50
 51#[derive(Serialize)]
 52struct AmplitudeEventBatch {
 53    api_key: &'static str,
 54    events: Vec<AmplitudeEvent>,
 55    options: AmplitudeEventBatchOptions,
 56}
 57
 58#[derive(Serialize)]
 59struct AmplitudeEventBatchOptions {
 60    min_id_length: usize,
 61}
 62
 63#[derive(Serialize)]
 64struct AmplitudeEvent {
 65    #[serde(skip_serializing_if = "Option::is_none")]
 66    user_id: Option<Arc<str>>,
 67    device_id: Option<Arc<str>>,
 68    event_type: String,
 69    #[serde(skip_serializing_if = "Option::is_none")]
 70    event_properties: Option<Map<String, Value>>,
 71    #[serde(skip_serializing_if = "Option::is_none")]
 72    user_properties: Option<Map<String, Value>>,
 73    os_name: &'static str,
 74    os_version: Option<Arc<str>>,
 75    app_version: Option<Arc<str>>,
 76    event_id: usize,
 77    session_id: u128,
 78    time: u128,
 79}
 80
 81#[cfg(debug_assertions)]
 82const MAX_QUEUE_LEN: usize = 1;
 83
 84#[cfg(not(debug_assertions))]
 85const MAX_QUEUE_LEN: usize = 10;
 86
 87#[cfg(debug_assertions)]
 88const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
 89
 90#[cfg(not(debug_assertions))]
 91const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
 92
 93impl Telemetry {
 94    pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
 95        let platform = cx.platform();
 96        let this = Arc::new(Self {
 97            http_client: client,
 98            executor: cx.background().clone(),
 99            session_id: SystemTime::now()
100                .duration_since(UNIX_EPOCH)
101                .unwrap()
102                .as_millis(),
103            state: Mutex::new(TelemetryState {
104                os_version: platform
105                    .os_version()
106                    .log_err()
107                    .map(|v| v.to_string().into()),
108                os_name: platform.os_name().into(),
109                app_version: platform
110                    .app_version()
111                    .log_err()
112                    .map(|v| v.to_string().into()),
113                device_id: None,
114                queue: Default::default(),
115                flush_task: Default::default(),
116                next_event_id: 0,
117                log_file: None,
118                user_id: None,
119            }),
120        });
121
122        if AMPLITUDE_API_KEY.is_some() {
123            this.executor
124                .spawn({
125                    let this = this.clone();
126                    async move {
127                        if let Some(tempfile) = NamedTempFile::new().log_err() {
128                            this.state.lock().log_file = Some(tempfile);
129                        }
130                    }
131                })
132                .detach();
133        }
134
135        this
136    }
137
138    pub fn log_file_path(&self) -> Option<PathBuf> {
139        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
140    }
141
142    pub fn start(self: &Arc<Self>, db: Arc<Db>) {
143        let this = self.clone();
144        self.executor
145            .spawn(
146                async move {
147                    let device_id = if let Some(device_id) = db
148                        .read(["device_id"])?
149                        .into_iter()
150                        .flatten()
151                        .next()
152                        .and_then(|bytes| String::from_utf8(bytes).ok())
153                    {
154                        device_id
155                    } else {
156                        let device_id = Uuid::new_v4().to_string();
157                        db.write([("device_id", device_id.as_bytes())])?;
158                        device_id
159                    };
160
161                    let device_id = Some(Arc::from(device_id));
162                    let mut state = this.state.lock();
163                    state.device_id = device_id.clone();
164                    for event in &mut state.queue {
165                        event.device_id = device_id.clone();
166                    }
167                    if !state.queue.is_empty() {
168                        drop(state);
169                        this.flush();
170                    }
171
172                    anyhow::Ok(())
173                }
174                .log_err(),
175            )
176            .detach();
177    }
178
179    pub fn set_user_id(&self, user_id: Option<u64>) {
180        self.state.lock().user_id = user_id.map(|id| id.to_string().into());
181    }
182
183    pub fn report_event(self: &Arc<Self>, kind: &str, properties: Value) {
184        if AMPLITUDE_API_KEY.is_none() {
185            return;
186        }
187
188        let mut state = self.state.lock();
189        let event = AmplitudeEvent {
190            event_type: kind.to_string(),
191            time: SystemTime::now()
192                .duration_since(UNIX_EPOCH)
193                .unwrap()
194                .as_millis(),
195            session_id: self.session_id,
196            event_properties: if let Value::Object(properties) = properties {
197                Some(properties)
198            } else {
199                None
200            },
201            user_properties: None,
202            user_id: state.user_id.clone(),
203            device_id: state.device_id.clone(),
204            os_name: state.os_name,
205            os_version: state.os_version.clone(),
206            app_version: state.app_version.clone(),
207            event_id: post_inc(&mut state.next_event_id),
208        };
209        state.queue.push(event);
210        if state.device_id.is_some() {
211            if state.queue.len() >= MAX_QUEUE_LEN {
212                drop(state);
213                self.flush();
214            } else {
215                let this = self.clone();
216                let executor = self.executor.clone();
217                state.flush_task = Some(self.executor.spawn(async move {
218                    executor.timer(DEBOUNCE_INTERVAL).await;
219                    this.flush();
220                }));
221            }
222        }
223    }
224
225    fn flush(self: &Arc<Self>) {
226        let mut state = self.state.lock();
227        let events = mem::take(&mut state.queue);
228        state.flush_task.take();
229        drop(state);
230
231        if let Some(api_key) = AMPLITUDE_API_KEY.as_ref() {
232            let this = self.clone();
233            self.executor
234                .spawn(
235                    async move {
236                        let mut json_bytes = Vec::new();
237
238                        if let Some(file) = &mut this.state.lock().log_file {
239                            let file = file.as_file_mut();
240                            for event in &events {
241                                json_bytes.clear();
242                                serde_json::to_writer(&mut json_bytes, event)?;
243                                file.write_all(&json_bytes)?;
244                                file.write(b"\n")?;
245                            }
246                        }
247
248                        let batch = AmplitudeEventBatch {
249                            api_key,
250                            events,
251                            options: AmplitudeEventBatchOptions { min_id_length: 1 },
252                        };
253                        json_bytes.clear();
254                        serde_json::to_writer(&mut json_bytes, &batch)?;
255                        let request =
256                            Request::post(AMPLITUDE_EVENTS_URL).body(json_bytes.into())?;
257                        this.http_client.send(request).await?;
258                        Ok(())
259                    }
260                    .log_err(),
261                )
262                .detach();
263        }
264    }
265}