1use crate::http::HttpClient;
2use db::Db;
3use gpui::{
4 executor::Background,
5 serde_json::{self, value::Map, Value},
6 AppContext, Task,
7};
8use isahc::Request;
9use lazy_static::lazy_static;
10use parking_lot::Mutex;
11use serde::Serialize;
12use std::{
13 mem,
14 sync::Arc,
15 time::{Duration, SystemTime, UNIX_EPOCH},
16};
17use util::{post_inc, ResultExt, TryFutureExt};
18use uuid::Uuid;
19
20pub struct Telemetry {
21 client: Arc<dyn HttpClient>,
22 executor: Arc<Background>,
23 session_id: u128,
24 state: Mutex<TelemetryState>,
25}
26
27#[derive(Default)]
28struct TelemetryState {
29 user_id: Option<Arc<str>>,
30 device_id: Option<Arc<str>>,
31 app_version: Option<Arc<str>>,
32 os_version: Option<Arc<str>>,
33 os_name: &'static str,
34 queue: Vec<AmplitudeEvent>,
35 next_event_id: usize,
36 flush_task: Option<Task<()>>,
37}
38
39const AMPLITUDE_EVENTS_URL: &'static str = "https://api2.amplitude.com/batch";
40
41lazy_static! {
42 static ref AMPLITUDE_API_KEY: Option<String> = std::env::var("ZED_AMPLITUDE_API_KEY")
43 .ok()
44 .or_else(|| option_env!("ZED_AMPLITUDE_API_KEY").map(|key| key.to_string()));
45}
46
47#[derive(Serialize)]
48struct AmplitudeEventBatch {
49 api_key: &'static str,
50 events: Vec<AmplitudeEvent>,
51}
52
53#[derive(Serialize)]
54struct AmplitudeEvent {
55 user_id: Option<Arc<str>>,
56 device_id: Option<Arc<str>>,
57 event_type: String,
58 event_properties: Option<Map<String, Value>>,
59 user_properties: Option<Map<String, Value>>,
60 os_name: &'static str,
61 os_version: Option<Arc<str>>,
62 app_version: Option<Arc<str>>,
63 event_id: usize,
64 session_id: u128,
65 time: u128,
66}
67
68#[cfg(debug_assertions)]
69const MAX_QUEUE_LEN: usize = 1;
70
71#[cfg(not(debug_assertions))]
72const MAX_QUEUE_LEN: usize = 10;
73
74#[cfg(debug_assertions)]
75const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
76
77#[cfg(not(debug_assertions))]
78const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
79
80impl Telemetry {
81 pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
82 let platform = cx.platform();
83 Arc::new(Self {
84 client,
85 executor: cx.background().clone(),
86 session_id: SystemTime::now()
87 .duration_since(UNIX_EPOCH)
88 .unwrap()
89 .as_millis(),
90 state: Mutex::new(TelemetryState {
91 os_version: platform
92 .os_version()
93 .log_err()
94 .map(|v| v.to_string().into()),
95 os_name: platform.os_name().into(),
96 app_version: platform
97 .app_version()
98 .log_err()
99 .map(|v| v.to_string().into()),
100 device_id: None,
101 queue: Default::default(),
102 flush_task: Default::default(),
103 next_event_id: 0,
104 user_id: None,
105 }),
106 })
107 }
108
109 pub fn start(self: &Arc<Self>, db: Arc<Db>) {
110 let this = self.clone();
111 self.executor
112 .spawn(
113 async move {
114 let device_id = if let Some(device_id) = db
115 .read(["device_id"])?
116 .into_iter()
117 .flatten()
118 .next()
119 .and_then(|bytes| String::from_utf8(bytes).ok())
120 {
121 device_id
122 } else {
123 let device_id = Uuid::new_v4().to_string();
124 db.write([("device_id", device_id.as_bytes())])?;
125 device_id
126 };
127
128 let device_id = Some(Arc::from(device_id));
129 let mut state = this.state.lock();
130 state.device_id = device_id.clone();
131 for event in &mut state.queue {
132 event.device_id = device_id.clone();
133 }
134 if !state.queue.is_empty() {
135 drop(state);
136 this.flush();
137 }
138
139 anyhow::Ok(())
140 }
141 .log_err(),
142 )
143 .detach();
144 }
145
146 pub fn set_user_id(&self, user_id: Option<u64>) {
147 self.state.lock().user_id = user_id.map(|id| id.to_string().into());
148 }
149
150 pub fn report_event(self: &Arc<Self>, kind: &str, properties: Value) {
151 if AMPLITUDE_API_KEY.is_none() {
152 return;
153 }
154
155 let mut state = self.state.lock();
156 let event = AmplitudeEvent {
157 event_type: kind.to_string(),
158 time: SystemTime::now()
159 .duration_since(UNIX_EPOCH)
160 .unwrap()
161 .as_millis(),
162 session_id: self.session_id,
163 event_properties: if let Value::Object(properties) = properties {
164 Some(properties)
165 } else {
166 None
167 },
168 user_properties: None,
169 user_id: state.user_id.clone(),
170 device_id: state.device_id.clone(),
171 os_name: state.os_name,
172 os_version: state.os_version.clone(),
173 app_version: state.app_version.clone(),
174 event_id: post_inc(&mut state.next_event_id),
175 };
176 state.queue.push(event);
177 if state.device_id.is_some() {
178 if state.queue.len() >= MAX_QUEUE_LEN {
179 drop(state);
180 self.flush();
181 } else {
182 let this = self.clone();
183 let executor = self.executor.clone();
184 state.flush_task = Some(self.executor.spawn(async move {
185 executor.timer(DEBOUNCE_INTERVAL).await;
186 this.flush();
187 }));
188 }
189 }
190 }
191
192 fn flush(&self) {
193 let mut state = self.state.lock();
194 let events = mem::take(&mut state.queue);
195 state.flush_task.take();
196
197 if let Some(api_key) = AMPLITUDE_API_KEY.as_ref() {
198 let client = self.client.clone();
199 self.executor
200 .spawn(async move {
201 let batch = AmplitudeEventBatch { api_key, events };
202 let body = serde_json::to_vec(&batch).log_err()?;
203 let request = Request::post(AMPLITUDE_EVENTS_URL)
204 .body(body.into())
205 .log_err()?;
206 client.send(request).await.log_err();
207 Some(())
208 })
209 .detach();
210 }
211 }
212}