1mod event_coalescer;
2
3use crate::{ChannelId, TelemetrySettings};
4use chrono::{DateTime, Utc};
5use clock::SystemClock;
6use futures::Future;
7use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
8use once_cell::sync::Lazy;
9use parking_lot::Mutex;
10use release_channel::ReleaseChannel;
11use settings::{Settings, SettingsStore};
12use sha2::{Digest, Sha256};
13use std::io::Write;
14use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
15use sysinfo::{
16 CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
17};
18use telemetry_events::{
19 ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CopilotEvent, CpuEvent,
20 EditEvent, EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
21};
22use tempfile::NamedTempFile;
23use util::http::{self, HttpClient, HttpClientWithUrl, Method};
24#[cfg(not(debug_assertions))]
25use util::ResultExt;
26use util::TryFutureExt;
27
28use self::event_coalescer::EventCoalescer;
29
30pub struct Telemetry {
31 clock: Arc<dyn SystemClock>,
32 http_client: Arc<HttpClientWithUrl>,
33 executor: BackgroundExecutor,
34 state: Arc<Mutex<TelemetryState>>,
35}
36
37struct TelemetryState {
38 settings: TelemetrySettings,
39 metrics_id: Option<Arc<str>>, // Per logged-in user
40 installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
41 session_id: Option<String>, // Per app launch
42 release_channel: Option<&'static str>,
43 app_metadata: AppMetadata,
44 architecture: &'static str,
45 events_queue: Vec<EventWrapper>,
46 flush_events_task: Option<Task<()>>,
47 log_file: Option<NamedTempFile>,
48 is_staff: Option<bool>,
49 first_event_date_time: Option<DateTime<Utc>>,
50 event_coalescer: EventCoalescer,
51 max_queue_size: usize,
52}
53
54#[cfg(debug_assertions)]
55const MAX_QUEUE_LEN: usize = 5;
56
57#[cfg(not(debug_assertions))]
58const MAX_QUEUE_LEN: usize = 50;
59
60#[cfg(debug_assertions)]
61const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
62
63#[cfg(not(debug_assertions))]
64const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
65static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
66 option_env!("ZED_CLIENT_CHECKSUM_SEED")
67 .map(|s| s.as_bytes().into())
68 .or_else(|| {
69 env::var("ZED_CLIENT_CHECKSUM_SEED")
70 .ok()
71 .map(|s| s.as_bytes().into())
72 })
73});
74
75impl Telemetry {
76 pub fn new(
77 clock: Arc<dyn SystemClock>,
78 client: Arc<HttpClientWithUrl>,
79 cx: &mut AppContext,
80 ) -> Arc<Self> {
81 let release_channel =
82 ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
83
84 TelemetrySettings::register(cx);
85
86 let state = Arc::new(Mutex::new(TelemetryState {
87 settings: TelemetrySettings::get_global(cx).clone(),
88 app_metadata: cx.app_metadata(),
89 architecture: env::consts::ARCH,
90 release_channel,
91 installation_id: None,
92 metrics_id: None,
93 session_id: None,
94 events_queue: Vec::new(),
95 flush_events_task: None,
96 log_file: None,
97 is_staff: None,
98 first_event_date_time: None,
99 event_coalescer: EventCoalescer::new(clock.clone()),
100 max_queue_size: MAX_QUEUE_LEN,
101 }));
102
103 #[cfg(not(debug_assertions))]
104 cx.background_executor()
105 .spawn({
106 let state = state.clone();
107 async move {
108 if let Some(tempfile) =
109 NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
110 {
111 state.lock().log_file = Some(tempfile);
112 }
113 }
114 })
115 .detach();
116
117 cx.observe_global::<SettingsStore>({
118 let state = state.clone();
119
120 move |cx| {
121 let mut state = state.lock();
122 state.settings = TelemetrySettings::get_global(cx).clone();
123 }
124 })
125 .detach();
126
127 // TODO: Replace all hardware stuff with nested SystemSpecs json
128 let this = Arc::new(Self {
129 clock,
130 http_client: client,
131 executor: cx.background_executor().clone(),
132 state,
133 });
134
135 // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
136 // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
137 std::mem::forget(cx.on_app_quit({
138 let this = this.clone();
139 move |_| this.shutdown_telemetry()
140 }));
141
142 this
143 }
144
145 #[cfg(any(test, feature = "test-support"))]
146 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
147 Task::ready(())
148 }
149
150 // Skip calling this function in tests.
151 // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
152 #[cfg(not(any(test, feature = "test-support")))]
153 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
154 self.report_app_event("close".to_string());
155 // TODO: close final edit period and make sure it's sent
156 Task::ready(())
157 }
158
159 pub fn log_file_path(&self) -> Option<PathBuf> {
160 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
161 }
162
163 pub fn start(
164 self: &Arc<Self>,
165 installation_id: Option<String>,
166 session_id: String,
167 cx: &mut AppContext,
168 ) {
169 let mut state = self.state.lock();
170 state.installation_id = installation_id.map(|id| id.into());
171 state.session_id = Some(session_id.into());
172 drop(state);
173
174 let this = self.clone();
175 cx.spawn(|_| async move {
176 // Avoiding calling `System::new_all()`, as there have been crashes related to it
177 let refresh_kind = RefreshKind::new()
178 .with_memory() // For memory usage
179 .with_processes(ProcessRefreshKind::everything()) // For process usage
180 .with_cpu(CpuRefreshKind::everything()); // For core count
181
182 let mut system = System::new_with_specifics(refresh_kind);
183
184 // Avoiding calling `refresh_all()`, just update what we need
185 system.refresh_specifics(refresh_kind);
186
187 // Waiting some amount of time before the first query is important to get a reasonable value
188 // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
189 const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
190
191 loop {
192 smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
193
194 system.refresh_specifics(refresh_kind);
195
196 let current_process = Pid::from_u32(std::process::id());
197 let Some(process) = system.processes().get(¤t_process) else {
198 let process = current_process;
199 log::error!("Failed to find own process {process:?} in system process table");
200 // TODO: Fire an error telemetry event
201 return;
202 };
203
204 this.report_memory_event(process.memory(), process.virtual_memory());
205 this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
206 }
207 })
208 .detach();
209 }
210
211 pub fn set_authenticated_user_info(
212 self: &Arc<Self>,
213 metrics_id: Option<String>,
214 is_staff: bool,
215 ) {
216 let mut state = self.state.lock();
217
218 if !state.settings.metrics {
219 return;
220 }
221
222 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
223 state.metrics_id = metrics_id.clone();
224 state.is_staff = Some(is_staff);
225 drop(state);
226 }
227
228 pub fn report_editor_event(
229 self: &Arc<Self>,
230 file_extension: Option<String>,
231 vim_mode: bool,
232 operation: &'static str,
233 copilot_enabled: bool,
234 copilot_enabled_for_language: bool,
235 ) {
236 let event = Event::Editor(EditorEvent {
237 file_extension,
238 vim_mode,
239 operation: operation.into(),
240 copilot_enabled,
241 copilot_enabled_for_language,
242 });
243
244 self.report_event(event)
245 }
246
247 pub fn report_copilot_event(
248 self: &Arc<Self>,
249 suggestion_id: Option<String>,
250 suggestion_accepted: bool,
251 file_extension: Option<String>,
252 ) {
253 let event = Event::Copilot(CopilotEvent {
254 suggestion_id,
255 suggestion_accepted,
256 file_extension,
257 });
258
259 self.report_event(event)
260 }
261
262 pub fn report_assistant_event(
263 self: &Arc<Self>,
264 conversation_id: Option<String>,
265 kind: AssistantKind,
266 model: &'static str,
267 ) {
268 let event = Event::Assistant(AssistantEvent {
269 conversation_id,
270 kind,
271 model: model.to_string(),
272 });
273
274 self.report_event(event)
275 }
276
277 pub fn report_call_event(
278 self: &Arc<Self>,
279 operation: &'static str,
280 room_id: Option<u64>,
281 channel_id: Option<ChannelId>,
282 ) {
283 let event = Event::Call(CallEvent {
284 operation: operation.to_string(),
285 room_id,
286 channel_id: channel_id.map(|cid| cid.0),
287 });
288
289 self.report_event(event)
290 }
291
292 pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
293 let event = Event::Cpu(CpuEvent {
294 usage_as_percentage,
295 core_count,
296 });
297
298 self.report_event(event)
299 }
300
301 pub fn report_memory_event(
302 self: &Arc<Self>,
303 memory_in_bytes: u64,
304 virtual_memory_in_bytes: u64,
305 ) {
306 let event = Event::Memory(MemoryEvent {
307 memory_in_bytes,
308 virtual_memory_in_bytes,
309 });
310
311 self.report_event(event)
312 }
313
314 pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
315 let event = Event::App(AppEvent { operation });
316
317 self.report_event(event.clone());
318
319 event
320 }
321
322 pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
323 let event = Event::Setting(SettingEvent {
324 setting: setting.to_string(),
325 value,
326 });
327
328 self.report_event(event)
329 }
330
331 pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
332 let mut state = self.state.lock();
333 let period_data = state.event_coalescer.log_event(environment);
334 drop(state);
335
336 if let Some((start, end, environment)) = period_data {
337 let event = Event::Edit(EditEvent {
338 duration: end.timestamp_millis() - start.timestamp_millis(),
339 environment: environment.to_string(),
340 });
341
342 self.report_event(event);
343 }
344 }
345
346 pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
347 let event = Event::Action(ActionEvent {
348 source: source.to_string(),
349 action,
350 });
351
352 self.report_event(event)
353 }
354
355 fn report_event(self: &Arc<Self>, event: Event) {
356 let mut state = self.state.lock();
357
358 if !state.settings.metrics {
359 return;
360 }
361
362 if state.flush_events_task.is_none() {
363 let this = self.clone();
364 let executor = self.executor.clone();
365 state.flush_events_task = Some(self.executor.spawn(async move {
366 executor.timer(FLUSH_INTERVAL).await;
367 this.flush_events();
368 }));
369 }
370
371 let date_time = self.clock.utc_now();
372
373 let milliseconds_since_first_event = match state.first_event_date_time {
374 Some(first_event_date_time) => {
375 date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
376 }
377 None => {
378 state.first_event_date_time = Some(date_time);
379 0
380 }
381 };
382
383 let signed_in = state.metrics_id.is_some();
384 state.events_queue.push(EventWrapper {
385 signed_in,
386 milliseconds_since_first_event,
387 event,
388 });
389
390 if state.installation_id.is_some() {
391 if state.events_queue.len() >= state.max_queue_size {
392 drop(state);
393 self.flush_events();
394 }
395 }
396 }
397
398 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
399 self.state.lock().metrics_id.clone()
400 }
401
402 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
403 self.state.lock().installation_id.clone()
404 }
405
406 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
407 self.state.lock().is_staff
408 }
409
410 pub fn flush_events(self: &Arc<Self>) {
411 let mut state = self.state.lock();
412 state.first_event_date_time = None;
413 let mut events = mem::take(&mut state.events_queue);
414 state.flush_events_task.take();
415 drop(state);
416 if events.is_empty() {
417 return;
418 }
419
420 let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
421 return;
422 };
423
424 let this = self.clone();
425 self.executor
426 .spawn(
427 async move {
428 let mut json_bytes = Vec::new();
429
430 if let Some(file) = &mut this.state.lock().log_file {
431 let file = file.as_file_mut();
432 for event in &mut events {
433 json_bytes.clear();
434 serde_json::to_writer(&mut json_bytes, event)?;
435 file.write_all(&json_bytes)?;
436 file.write(b"\n")?;
437 }
438 }
439
440 {
441 let state = this.state.lock();
442 let request_body = EventRequestBody {
443 installation_id: state.installation_id.as_deref().map(Into::into),
444 session_id: state.session_id.clone(),
445 is_staff: state.is_staff.clone(),
446 app_version: state
447 .app_metadata
448 .app_version
449 .unwrap_or_default()
450 .to_string(),
451 os_name: state.app_metadata.os_name.to_string(),
452 os_version: state
453 .app_metadata
454 .os_version
455 .map(|version| version.to_string()),
456 architecture: state.architecture.to_string(),
457
458 release_channel: state.release_channel.map(Into::into),
459 events,
460 };
461 json_bytes.clear();
462 serde_json::to_writer(&mut json_bytes, &request_body)?;
463 }
464
465 let mut summer = Sha256::new();
466 summer.update(checksum_seed);
467 summer.update(&json_bytes);
468 summer.update(checksum_seed);
469 let mut checksum = String::new();
470 for byte in summer.finalize().as_slice() {
471 use std::fmt::Write;
472 write!(&mut checksum, "{:02x}", byte).unwrap();
473 }
474
475 let request = http::Request::builder()
476 .method(Method::POST)
477 .uri(this.http_client.build_zed_api_url("/telemetry/events"))
478 .header("Content-Type", "text/plain")
479 .header("x-zed-checksum", checksum)
480 .body(json_bytes.into());
481
482 let response = this.http_client.send(request?).await?;
483 if response.status() != 200 {
484 log::error!("Failed to send events: HTTP {:?}", response.status());
485 }
486 anyhow::Ok(())
487 }
488 .log_err(),
489 )
490 .detach();
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use chrono::TimeZone;
498 use clock::FakeSystemClock;
499 use gpui::TestAppContext;
500 use util::http::FakeHttpClient;
501
502 #[gpui::test]
503 fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
504 init_test(cx);
505 let clock = Arc::new(FakeSystemClock::new(
506 Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
507 ));
508 let http = FakeHttpClient::with_200_response();
509 let installation_id = Some("installation_id".to_string());
510 let session_id = "session_id".to_string();
511
512 cx.update(|cx| {
513 let telemetry = Telemetry::new(clock.clone(), http, cx);
514
515 telemetry.state.lock().max_queue_size = 4;
516 telemetry.start(installation_id, session_id, cx);
517
518 assert!(is_empty_state(&telemetry));
519
520 let first_date_time = clock.utc_now();
521 let operation = "test".to_string();
522
523 let event = telemetry.report_app_event(operation.clone());
524 assert_eq!(
525 event,
526 Event::App(AppEvent {
527 operation: operation.clone(),
528 })
529 );
530 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
531 assert!(telemetry.state.lock().flush_events_task.is_some());
532 assert_eq!(
533 telemetry.state.lock().first_event_date_time,
534 Some(first_date_time)
535 );
536
537 clock.advance(chrono::Duration::milliseconds(100));
538
539 let event = telemetry.report_app_event(operation.clone());
540 assert_eq!(
541 event,
542 Event::App(AppEvent {
543 operation: operation.clone(),
544 })
545 );
546 assert_eq!(telemetry.state.lock().events_queue.len(), 2);
547 assert!(telemetry.state.lock().flush_events_task.is_some());
548 assert_eq!(
549 telemetry.state.lock().first_event_date_time,
550 Some(first_date_time)
551 );
552
553 clock.advance(chrono::Duration::milliseconds(100));
554
555 let event = telemetry.report_app_event(operation.clone());
556 assert_eq!(
557 event,
558 Event::App(AppEvent {
559 operation: operation.clone(),
560 })
561 );
562 assert_eq!(telemetry.state.lock().events_queue.len(), 3);
563 assert!(telemetry.state.lock().flush_events_task.is_some());
564 assert_eq!(
565 telemetry.state.lock().first_event_date_time,
566 Some(first_date_time)
567 );
568
569 clock.advance(chrono::Duration::milliseconds(100));
570
571 // Adding a 4th event should cause a flush
572 let event = telemetry.report_app_event(operation.clone());
573 assert_eq!(
574 event,
575 Event::App(AppEvent {
576 operation: operation.clone(),
577 })
578 );
579
580 assert!(is_empty_state(&telemetry));
581 });
582 }
583
584 #[gpui::test]
585 async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
586 init_test(cx);
587 let clock = Arc::new(FakeSystemClock::new(
588 Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
589 ));
590 let http = FakeHttpClient::with_200_response();
591 let installation_id = Some("installation_id".to_string());
592 let session_id = "session_id".to_string();
593
594 cx.update(|cx| {
595 let telemetry = Telemetry::new(clock.clone(), http, cx);
596 telemetry.state.lock().max_queue_size = 4;
597 telemetry.start(installation_id, session_id, cx);
598
599 assert!(is_empty_state(&telemetry));
600
601 let first_date_time = clock.utc_now();
602 let operation = "test".to_string();
603
604 let event = telemetry.report_app_event(operation.clone());
605 assert_eq!(
606 event,
607 Event::App(AppEvent {
608 operation: operation.clone(),
609 })
610 );
611 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
612 assert!(telemetry.state.lock().flush_events_task.is_some());
613 assert_eq!(
614 telemetry.state.lock().first_event_date_time,
615 Some(first_date_time)
616 );
617
618 let duration = Duration::from_millis(1);
619
620 // Test 1 millisecond before the flush interval limit is met
621 executor.advance_clock(FLUSH_INTERVAL - duration);
622
623 assert!(!is_empty_state(&telemetry));
624
625 // Test the exact moment the flush interval limit is met
626 executor.advance_clock(duration);
627
628 assert!(is_empty_state(&telemetry));
629 });
630 }
631
632 // TODO:
633 // Test settings
634 // Update FakeHTTPClient to keep track of the number of requests and assert on it
635
636 fn init_test(cx: &mut TestAppContext) {
637 cx.update(|cx| {
638 let settings_store = SettingsStore::test(cx);
639 cx.set_global(settings_store);
640 });
641 }
642
643 fn is_empty_state(telemetry: &Telemetry) -> bool {
644 telemetry.state.lock().events_queue.is_empty()
645 && telemetry.state.lock().flush_events_task.is_none()
646 && telemetry.state.lock().first_event_date_time.is_none()
647 }
648}