1use crate::http::HttpClient;
2use db::Db;
3use gpui::{
4 executor::Background,
5 serde_json::{self, value::Map, Value},
6 AppContext, Task,
7};
8use isahc::Request;
9use lazy_static::lazy_static;
10use parking_lot::Mutex;
11use serde::Serialize;
12use std::{
13 io::Write,
14 mem,
15 path::PathBuf,
16 sync::Arc,
17 time::{Duration, SystemTime, UNIX_EPOCH},
18};
19use tempfile::NamedTempFile;
20use util::{post_inc, ResultExt, TryFutureExt};
21use uuid::Uuid;
22
23pub struct Telemetry {
24 http_client: Arc<dyn HttpClient>,
25 executor: Arc<Background>,
26 session_id: u128,
27 state: Mutex<TelemetryState>,
28}
29
30#[derive(Default)]
31struct TelemetryState {
32 user_id: Option<Arc<str>>,
33 device_id: Option<Arc<str>>,
34 app_version: Option<Arc<str>>,
35 os_version: Option<Arc<str>>,
36 os_name: &'static str,
37 queue: Vec<AmplitudeEvent>,
38 next_event_id: usize,
39 flush_task: Option<Task<()>>,
40 log_file: Option<NamedTempFile>,
41}
42
43const AMPLITUDE_EVENTS_URL: &'static str = "https://api2.amplitude.com/batch";
44
45lazy_static! {
46 static ref AMPLITUDE_API_KEY: Option<String> = std::env::var("ZED_AMPLITUDE_API_KEY")
47 .ok()
48 .or_else(|| option_env!("ZED_AMPLITUDE_API_KEY").map(|key| key.to_string()));
49}
50
51#[derive(Serialize)]
52struct AmplitudeEventBatch {
53 api_key: &'static str,
54 events: Vec<AmplitudeEvent>,
55 options: AmplitudeEventBatchOptions,
56}
57
58#[derive(Serialize)]
59struct AmplitudeEventBatchOptions {
60 min_id_length: usize,
61}
62
63#[derive(Serialize)]
64struct AmplitudeEvent {
65 #[serde(skip_serializing_if = "Option::is_none")]
66 user_id: Option<Arc<str>>,
67 device_id: Option<Arc<str>>,
68 event_type: String,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 event_properties: Option<Map<String, Value>>,
71 #[serde(skip_serializing_if = "Option::is_none")]
72 user_properties: Option<Map<String, Value>>,
73 os_name: &'static str,
74 os_version: Option<Arc<str>>,
75 app_version: Option<Arc<str>>,
76 event_id: usize,
77 session_id: u128,
78 time: u128,
79}
80
81#[cfg(debug_assertions)]
82const MAX_QUEUE_LEN: usize = 1;
83
84#[cfg(not(debug_assertions))]
85const MAX_QUEUE_LEN: usize = 10;
86
87#[cfg(debug_assertions)]
88const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
89
90#[cfg(not(debug_assertions))]
91const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
92
93impl Telemetry {
94 pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
95 let platform = cx.platform();
96 let this = Arc::new(Self {
97 http_client: client,
98 executor: cx.background().clone(),
99 session_id: SystemTime::now()
100 .duration_since(UNIX_EPOCH)
101 .unwrap()
102 .as_millis(),
103 state: Mutex::new(TelemetryState {
104 os_version: platform
105 .os_version()
106 .log_err()
107 .map(|v| v.to_string().into()),
108 os_name: platform.os_name().into(),
109 app_version: platform
110 .app_version()
111 .log_err()
112 .map(|v| v.to_string().into()),
113 device_id: None,
114 queue: Default::default(),
115 flush_task: Default::default(),
116 next_event_id: 0,
117 log_file: None,
118 user_id: None,
119 }),
120 });
121
122 if AMPLITUDE_API_KEY.is_some() {
123 this.executor
124 .spawn({
125 let this = this.clone();
126 async move {
127 if let Some(tempfile) = NamedTempFile::new().log_err() {
128 this.state.lock().log_file = Some(tempfile);
129 }
130 }
131 })
132 .detach();
133 }
134
135 this
136 }
137
138 pub fn log_file_path(&self) -> Option<PathBuf> {
139 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
140 }
141
142 pub fn start(self: &Arc<Self>, db: Arc<Db>) {
143 let this = self.clone();
144 self.executor
145 .spawn(
146 async move {
147 let device_id = if let Some(device_id) = db
148 .read(["device_id"])?
149 .into_iter()
150 .flatten()
151 .next()
152 .and_then(|bytes| String::from_utf8(bytes).ok())
153 {
154 device_id
155 } else {
156 let device_id = Uuid::new_v4().to_string();
157 db.write([("device_id", device_id.as_bytes())])?;
158 device_id
159 };
160
161 let device_id = Some(Arc::from(device_id));
162 let mut state = this.state.lock();
163 state.device_id = device_id.clone();
164 for event in &mut state.queue {
165 event.device_id = device_id.clone();
166 }
167 if !state.queue.is_empty() {
168 drop(state);
169 this.flush();
170 }
171
172 anyhow::Ok(())
173 }
174 .log_err(),
175 )
176 .detach();
177 }
178
179 pub fn set_user_id(&self, user_id: Option<u64>) {
180 self.state.lock().user_id = user_id.map(|id| id.to_string().into());
181 }
182
183 pub fn report_event(self: &Arc<Self>, kind: &str, properties: Value) {
184 if AMPLITUDE_API_KEY.is_none() {
185 return;
186 }
187
188 let mut state = self.state.lock();
189 let event = AmplitudeEvent {
190 event_type: kind.to_string(),
191 time: SystemTime::now()
192 .duration_since(UNIX_EPOCH)
193 .unwrap()
194 .as_millis(),
195 session_id: self.session_id,
196 event_properties: if let Value::Object(properties) = properties {
197 Some(properties)
198 } else {
199 None
200 },
201 user_properties: None,
202 user_id: state.user_id.clone(),
203 device_id: state.device_id.clone(),
204 os_name: state.os_name,
205 os_version: state.os_version.clone(),
206 app_version: state.app_version.clone(),
207 event_id: post_inc(&mut state.next_event_id),
208 };
209 state.queue.push(event);
210 if state.device_id.is_some() {
211 if state.queue.len() >= MAX_QUEUE_LEN {
212 drop(state);
213 self.flush();
214 } else {
215 let this = self.clone();
216 let executor = self.executor.clone();
217 state.flush_task = Some(self.executor.spawn(async move {
218 executor.timer(DEBOUNCE_INTERVAL).await;
219 this.flush();
220 }));
221 }
222 }
223 }
224
225 fn flush(self: &Arc<Self>) {
226 let mut state = self.state.lock();
227 let events = mem::take(&mut state.queue);
228 state.flush_task.take();
229 drop(state);
230
231 if let Some(api_key) = AMPLITUDE_API_KEY.as_ref() {
232 let this = self.clone();
233 self.executor
234 .spawn(
235 async move {
236 let mut json_bytes = Vec::new();
237
238 if let Some(file) = &mut this.state.lock().log_file {
239 let file = file.as_file_mut();
240 for event in &events {
241 json_bytes.clear();
242 serde_json::to_writer(&mut json_bytes, event)?;
243 file.write_all(&json_bytes)?;
244 file.write(b"\n")?;
245 }
246 }
247
248 let batch = AmplitudeEventBatch {
249 api_key,
250 events,
251 options: AmplitudeEventBatchOptions { min_id_length: 1 },
252 };
253 json_bytes.clear();
254 serde_json::to_writer(&mut json_bytes, &batch)?;
255 let request =
256 Request::post(AMPLITUDE_EVENTS_URL).body(json_bytes.into())?;
257 this.http_client.send(request).await?;
258 Ok(())
259 }
260 .log_err(),
261 )
262 .detach();
263 }
264 }
265}