telemetry.rs

  1use crate::http::HttpClient;
  2use db::Db;
  3use gpui::{
  4    executor::Background,
  5    serde_json::{self, value::Map, Value},
  6    AppContext, Task,
  7};
  8use isahc::Request;
  9use lazy_static::lazy_static;
 10use parking_lot::Mutex;
 11use serde::Serialize;
 12use std::{
 13    io::Write,
 14    mem,
 15    path::PathBuf,
 16    sync::Arc,
 17    time::{Duration, SystemTime, UNIX_EPOCH},
 18};
 19use tempfile::NamedTempFile;
 20use util::{post_inc, ResultExt, TryFutureExt};
 21use uuid::Uuid;
 22
 23pub struct Telemetry {
 24    http_client: Arc<dyn HttpClient>,
 25    executor: Arc<Background>,
 26    session_id: u128,
 27    state: Mutex<TelemetryState>,
 28}
 29
 30#[derive(Default)]
 31struct TelemetryState {
 32    user_id: Option<Arc<str>>,
 33    device_id: Option<Arc<str>>,
 34    app_version: Option<Arc<str>>,
 35    os_version: Option<Arc<str>>,
 36    os_name: &'static str,
 37    queue: Vec<AmplitudeEvent>,
 38    next_event_id: usize,
 39    flush_task: Option<Task<()>>,
 40    log_file: Option<NamedTempFile>,
 41}
 42
 43const AMPLITUDE_EVENTS_URL: &'static str = "https://api2.amplitude.com/batch";
 44
 45lazy_static! {
 46    static ref AMPLITUDE_API_KEY: Option<String> = std::env::var("ZED_AMPLITUDE_API_KEY")
 47        .ok()
 48        .or_else(|| option_env!("ZED_AMPLITUDE_API_KEY").map(|key| key.to_string()));
 49}
 50
 51#[derive(Serialize)]
 52struct AmplitudeEventBatch {
 53    api_key: &'static str,
 54    events: Vec<AmplitudeEvent>,
 55}
 56
 57#[derive(Serialize)]
 58struct AmplitudeEvent {
 59    #[serde(skip_serializing_if = "Option::is_none")]
 60    user_id: Option<Arc<str>>,
 61    device_id: Option<Arc<str>>,
 62    event_type: String,
 63    #[serde(skip_serializing_if = "Option::is_none")]
 64    event_properties: Option<Map<String, Value>>,
 65    #[serde(skip_serializing_if = "Option::is_none")]
 66    user_properties: Option<Map<String, Value>>,
 67    os_name: &'static str,
 68    os_version: Option<Arc<str>>,
 69    app_version: Option<Arc<str>>,
 70    event_id: usize,
 71    session_id: u128,
 72    time: u128,
 73}
 74
 75#[cfg(debug_assertions)]
 76const MAX_QUEUE_LEN: usize = 1;
 77
 78#[cfg(not(debug_assertions))]
 79const MAX_QUEUE_LEN: usize = 10;
 80
 81#[cfg(debug_assertions)]
 82const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
 83
 84#[cfg(not(debug_assertions))]
 85const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
 86
 87impl Telemetry {
 88    pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
 89        let platform = cx.platform();
 90        let this = Arc::new(Self {
 91            http_client: client,
 92            executor: cx.background().clone(),
 93            session_id: SystemTime::now()
 94                .duration_since(UNIX_EPOCH)
 95                .unwrap()
 96                .as_millis(),
 97            state: Mutex::new(TelemetryState {
 98                os_version: platform
 99                    .os_version()
100                    .log_err()
101                    .map(|v| v.to_string().into()),
102                os_name: platform.os_name().into(),
103                app_version: platform
104                    .app_version()
105                    .log_err()
106                    .map(|v| v.to_string().into()),
107                device_id: None,
108                queue: Default::default(),
109                flush_task: Default::default(),
110                next_event_id: 0,
111                log_file: None,
112                user_id: None,
113            }),
114        });
115
116        if AMPLITUDE_API_KEY.is_some() {
117            this.executor
118                .spawn({
119                    let this = this.clone();
120                    async move {
121                        if let Some(tempfile) = NamedTempFile::new().log_err() {
122                            this.state.lock().log_file = Some(tempfile);
123                        }
124                    }
125                })
126                .detach();
127        }
128
129        this
130    }
131
132    pub fn log_file_path(&self) -> Option<PathBuf> {
133        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
134    }
135
136    pub fn start(self: &Arc<Self>, db: Arc<Db>) {
137        let this = self.clone();
138        self.executor
139            .spawn(
140                async move {
141                    let device_id = if let Some(device_id) = db
142                        .read(["device_id"])?
143                        .into_iter()
144                        .flatten()
145                        .next()
146                        .and_then(|bytes| String::from_utf8(bytes).ok())
147                    {
148                        device_id
149                    } else {
150                        let device_id = Uuid::new_v4().to_string();
151                        db.write([("device_id", device_id.as_bytes())])?;
152                        device_id
153                    };
154
155                    let device_id = Some(Arc::from(device_id));
156                    let mut state = this.state.lock();
157                    state.device_id = device_id.clone();
158                    for event in &mut state.queue {
159                        event.device_id = device_id.clone();
160                    }
161                    if !state.queue.is_empty() {
162                        drop(state);
163                        this.flush();
164                    }
165
166                    anyhow::Ok(())
167                }
168                .log_err(),
169            )
170            .detach();
171    }
172
173    pub fn set_user_id(&self, user_id: Option<u64>) {
174        self.state.lock().user_id = user_id.map(|id| id.to_string().into());
175    }
176
177    pub fn report_event(self: &Arc<Self>, kind: &str, properties: Value) {
178        if AMPLITUDE_API_KEY.is_none() {
179            return;
180        }
181
182        let mut state = self.state.lock();
183        let event = AmplitudeEvent {
184            event_type: kind.to_string(),
185            time: SystemTime::now()
186                .duration_since(UNIX_EPOCH)
187                .unwrap()
188                .as_millis(),
189            session_id: self.session_id,
190            event_properties: if let Value::Object(properties) = properties {
191                Some(properties)
192            } else {
193                None
194            },
195            user_properties: None,
196            user_id: state.user_id.clone(),
197            device_id: state.device_id.clone(),
198            os_name: state.os_name,
199            os_version: state.os_version.clone(),
200            app_version: state.app_version.clone(),
201            event_id: post_inc(&mut state.next_event_id),
202        };
203        state.queue.push(event);
204        if state.device_id.is_some() {
205            if state.queue.len() >= MAX_QUEUE_LEN {
206                drop(state);
207                self.flush();
208            } else {
209                let this = self.clone();
210                let executor = self.executor.clone();
211                state.flush_task = Some(self.executor.spawn(async move {
212                    executor.timer(DEBOUNCE_INTERVAL).await;
213                    this.flush();
214                }));
215            }
216        }
217    }
218
219    fn flush(self: &Arc<Self>) {
220        let mut state = self.state.lock();
221        let events = mem::take(&mut state.queue);
222        state.flush_task.take();
223        drop(state);
224
225        if let Some(api_key) = AMPLITUDE_API_KEY.as_ref() {
226            let this = self.clone();
227            self.executor
228                .spawn(
229                    async move {
230                        let mut json_bytes = Vec::new();
231
232                        if let Some(file) = &mut this.state.lock().log_file {
233                            let file = file.as_file_mut();
234                            for event in &events {
235                                json_bytes.clear();
236                                serde_json::to_writer(&mut json_bytes, event)?;
237                                file.write_all(&json_bytes)?;
238                                file.write(b"\n")?;
239                            }
240                        }
241
242                        let batch = AmplitudeEventBatch { api_key, events };
243                        json_bytes.clear();
244                        serde_json::to_writer(&mut json_bytes, &batch)?;
245                        let request =
246                            Request::post(AMPLITUDE_EVENTS_URL).body(json_bytes.into())?;
247                        this.http_client.send(request).await?;
248                        Ok(())
249                    }
250                    .log_err(),
251                )
252                .detach();
253        }
254    }
255}