1use crate::{http::HttpClient, ZED_SECRET_CLIENT_TOKEN};
2use gpui::{
3 executor::Background,
4 serde_json::{self, value::Map, Value},
5 AppContext, Task,
6};
7use isahc::Request;
8use parking_lot::Mutex;
9use serde::Serialize;
10use std::{
11 mem,
12 sync::Arc,
13 time::{Duration, SystemTime, UNIX_EPOCH},
14};
15use util::{post_inc, ResultExt};
16
17pub struct Telemetry {
18 client: Arc<dyn HttpClient>,
19 executor: Arc<Background>,
20 session_id: u128,
21 state: Mutex<TelemetryState>,
22}
23
24#[derive(Default)]
25struct TelemetryState {
26 user_id: Option<Arc<str>>,
27 device_id: Option<Arc<str>>,
28 app_version: Option<Arc<str>>,
29 os_version: Option<Arc<str>>,
30 os_name: &'static str,
31 queue: Vec<AmplitudeEvent>,
32 next_event_id: usize,
33 flush_task: Option<Task<()>>,
34}
35
36const AMPLITUDE_EVENTS_URL: &'static str = "https//api2.amplitude.com/batch";
37
38#[derive(Serialize)]
39struct AmplitudeEventBatch {
40 api_key: &'static str,
41 events: Vec<AmplitudeEvent>,
42}
43
44#[derive(Serialize)]
45struct AmplitudeEvent {
46 user_id: Option<Arc<str>>,
47 device_id: Option<Arc<str>>,
48 event_type: String,
49 event_properties: Option<Map<String, Value>>,
50 user_properties: Option<Map<String, Value>>,
51 os_name: &'static str,
52 os_version: Option<Arc<str>>,
53 app_version: Option<Arc<str>>,
54 event_id: usize,
55 session_id: u128,
56 time: u128,
57}
58
59#[cfg(debug_assertions)]
60const MAX_QUEUE_LEN: usize = 1;
61
62#[cfg(not(debug_assertions))]
63const MAX_QUEUE_LEN: usize = 10;
64
65const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
66
67impl Telemetry {
68 pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
69 let platform = cx.platform();
70 Arc::new(Self {
71 client,
72 executor: cx.background().clone(),
73 session_id: SystemTime::now()
74 .duration_since(UNIX_EPOCH)
75 .unwrap()
76 .as_millis(),
77 state: Mutex::new(TelemetryState {
78 os_version: platform
79 .os_version()
80 .log_err()
81 .map(|v| v.to_string().into()),
82 os_name: platform.os_name().into(),
83 app_version: platform
84 .app_version()
85 .log_err()
86 .map(|v| v.to_string().into()),
87 device_id: None,
88 queue: Default::default(),
89 flush_task: Default::default(),
90 next_event_id: 0,
91 user_id: None,
92 }),
93 })
94 }
95
96 pub fn log_event(self: &Arc<Self>, kind: &str, properties: Value) {
97 let mut state = self.state.lock();
98 let event = AmplitudeEvent {
99 event_type: kind.to_string(),
100 time: SystemTime::now()
101 .duration_since(UNIX_EPOCH)
102 .unwrap()
103 .as_millis(),
104 session_id: self.session_id,
105 event_properties: if let Value::Object(properties) = properties {
106 Some(properties)
107 } else {
108 None
109 },
110 user_properties: None,
111 user_id: state.user_id.clone(),
112 device_id: state.device_id.clone(),
113 os_name: state.os_name,
114 os_version: state.os_version.clone(),
115 app_version: state.app_version.clone(),
116 event_id: post_inc(&mut state.next_event_id),
117 };
118 state.queue.push(event);
119 if state.queue.len() >= MAX_QUEUE_LEN {
120 drop(state);
121 self.flush();
122 } else {
123 let this = self.clone();
124 let executor = self.executor.clone();
125 state.flush_task = Some(self.executor.spawn(async move {
126 executor.timer(DEBOUNCE_INTERVAL).await;
127 this.flush();
128 }));
129 }
130 }
131
132 fn flush(&self) {
133 let mut state = self.state.lock();
134 let events = mem::take(&mut state.queue);
135 let client = self.client.clone();
136 state.flush_task.take();
137 self.executor
138 .spawn(async move {
139 let body = serde_json::to_vec(&AmplitudeEventBatch {
140 api_key: ZED_SECRET_CLIENT_TOKEN,
141 events,
142 })
143 .log_err()?;
144 let request = Request::post(AMPLITUDE_EVENTS_URL)
145 .header("Content-Type", "application/json")
146 .body(body.into())
147 .log_err()?;
148 client.send(request).await.log_err();
149 Some(())
150 })
151 .detach();
152 }
153}