1use crate::http::HttpClient;
2use db::Db;
3use gpui::{
4 executor::Background,
5 serde_json::{self, value::Map, Value},
6 AppContext, Task,
7};
8use isahc::Request;
9use lazy_static::lazy_static;
10use parking_lot::Mutex;
11use serde::Serialize;
12use std::{
13 io::Write,
14 mem,
15 path::PathBuf,
16 sync::Arc,
17 time::{Duration, SystemTime, UNIX_EPOCH},
18};
19use tempfile::NamedTempFile;
20use util::{post_inc, ResultExt, TryFutureExt};
21use uuid::Uuid;
22
23pub struct Telemetry {
24 http_client: Arc<dyn HttpClient>,
25 executor: Arc<Background>,
26 session_id: u128,
27 state: Mutex<TelemetryState>,
28}
29
30#[derive(Default)]
31struct TelemetryState {
32 user_id: Option<Arc<str>>,
33 device_id: Option<Arc<str>>,
34 app_version: Option<Arc<str>>,
35 os_version: Option<Arc<str>>,
36 os_name: &'static str,
37 queue: Vec<AmplitudeEvent>,
38 next_event_id: usize,
39 flush_task: Option<Task<()>>,
40 log_file: Option<NamedTempFile>,
41}
42
43const AMPLITUDE_EVENTS_URL: &'static str = "https://api2.amplitude.com/batch";
44
45lazy_static! {
46 static ref AMPLITUDE_API_KEY: Option<String> = std::env::var("ZED_AMPLITUDE_API_KEY")
47 .ok()
48 .or_else(|| option_env!("ZED_AMPLITUDE_API_KEY").map(|key| key.to_string()));
49}
50
51#[derive(Serialize)]
52struct AmplitudeEventBatch {
53 api_key: &'static str,
54 events: Vec<AmplitudeEvent>,
55}
56
57#[derive(Serialize)]
58struct AmplitudeEvent {
59 #[serde(skip_serializing_if = "Option::is_none")]
60 user_id: Option<Arc<str>>,
61 device_id: Option<Arc<str>>,
62 event_type: String,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 event_properties: Option<Map<String, Value>>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 user_properties: Option<Map<String, Value>>,
67 os_name: &'static str,
68 os_version: Option<Arc<str>>,
69 app_version: Option<Arc<str>>,
70 event_id: usize,
71 session_id: u128,
72 time: u128,
73}
74
75#[cfg(debug_assertions)]
76const MAX_QUEUE_LEN: usize = 1;
77
78#[cfg(not(debug_assertions))]
79const MAX_QUEUE_LEN: usize = 10;
80
81#[cfg(debug_assertions)]
82const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
83
84#[cfg(not(debug_assertions))]
85const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
86
87impl Telemetry {
88 pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
89 let platform = cx.platform();
90 let this = Arc::new(Self {
91 http_client: client,
92 executor: cx.background().clone(),
93 session_id: SystemTime::now()
94 .duration_since(UNIX_EPOCH)
95 .unwrap()
96 .as_millis(),
97 state: Mutex::new(TelemetryState {
98 os_version: platform
99 .os_version()
100 .log_err()
101 .map(|v| v.to_string().into()),
102 os_name: platform.os_name().into(),
103 app_version: platform
104 .app_version()
105 .log_err()
106 .map(|v| v.to_string().into()),
107 device_id: None,
108 queue: Default::default(),
109 flush_task: Default::default(),
110 next_event_id: 0,
111 log_file: None,
112 user_id: None,
113 }),
114 });
115
116 if AMPLITUDE_API_KEY.is_some() {
117 this.executor
118 .spawn({
119 let this = this.clone();
120 async move {
121 if let Some(tempfile) = NamedTempFile::new().log_err() {
122 this.state.lock().log_file = Some(tempfile);
123 }
124 }
125 })
126 .detach();
127 }
128
129 this
130 }
131
132 pub fn log_file_path(&self) -> Option<PathBuf> {
133 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
134 }
135
136 pub fn start(self: &Arc<Self>, db: Arc<Db>) {
137 let this = self.clone();
138 self.executor
139 .spawn(
140 async move {
141 let device_id = if let Some(device_id) = db
142 .read(["device_id"])?
143 .into_iter()
144 .flatten()
145 .next()
146 .and_then(|bytes| String::from_utf8(bytes).ok())
147 {
148 device_id
149 } else {
150 let device_id = Uuid::new_v4().to_string();
151 db.write([("device_id", device_id.as_bytes())])?;
152 device_id
153 };
154
155 let device_id = Some(Arc::from(device_id));
156 let mut state = this.state.lock();
157 state.device_id = device_id.clone();
158 for event in &mut state.queue {
159 event.device_id = device_id.clone();
160 }
161 if !state.queue.is_empty() {
162 drop(state);
163 this.flush();
164 }
165
166 anyhow::Ok(())
167 }
168 .log_err(),
169 )
170 .detach();
171 }
172
173 pub fn set_user_id(&self, user_id: Option<u64>) {
174 self.state.lock().user_id = user_id.map(|id| id.to_string().into());
175 }
176
177 pub fn report_event(self: &Arc<Self>, kind: &str, properties: Value) {
178 if AMPLITUDE_API_KEY.is_none() {
179 return;
180 }
181
182 let mut state = self.state.lock();
183 let event = AmplitudeEvent {
184 event_type: kind.to_string(),
185 time: SystemTime::now()
186 .duration_since(UNIX_EPOCH)
187 .unwrap()
188 .as_millis(),
189 session_id: self.session_id,
190 event_properties: if let Value::Object(properties) = properties {
191 Some(properties)
192 } else {
193 None
194 },
195 user_properties: None,
196 user_id: state.user_id.clone(),
197 device_id: state.device_id.clone(),
198 os_name: state.os_name,
199 os_version: state.os_version.clone(),
200 app_version: state.app_version.clone(),
201 event_id: post_inc(&mut state.next_event_id),
202 };
203 state.queue.push(event);
204 if state.device_id.is_some() {
205 if state.queue.len() >= MAX_QUEUE_LEN {
206 drop(state);
207 self.flush();
208 } else {
209 let this = self.clone();
210 let executor = self.executor.clone();
211 state.flush_task = Some(self.executor.spawn(async move {
212 executor.timer(DEBOUNCE_INTERVAL).await;
213 this.flush();
214 }));
215 }
216 }
217 }
218
219 fn flush(self: &Arc<Self>) {
220 let mut state = self.state.lock();
221 let events = mem::take(&mut state.queue);
222 state.flush_task.take();
223 drop(state);
224
225 if let Some(api_key) = AMPLITUDE_API_KEY.as_ref() {
226 let this = self.clone();
227 self.executor
228 .spawn(
229 async move {
230 let mut json_bytes = Vec::new();
231
232 if let Some(file) = &mut this.state.lock().log_file {
233 let file = file.as_file_mut();
234 for event in &events {
235 json_bytes.clear();
236 serde_json::to_writer(&mut json_bytes, event)?;
237 file.write_all(&json_bytes)?;
238 file.write(b"\n")?;
239 }
240 }
241
242 let batch = AmplitudeEventBatch { api_key, events };
243 json_bytes.clear();
244 serde_json::to_writer(&mut json_bytes, &batch)?;
245 let request =
246 Request::post(AMPLITUDE_EVENTS_URL).body(json_bytes.into())?;
247 this.http_client.send(request).await?;
248 Ok(())
249 }
250 .log_err(),
251 )
252 .detach();
253 }
254 }
255}