1mod event_coalescer;
2
3use crate::{ChannelId, TelemetrySettings};
4use chrono::{DateTime, Utc};
5use clock::SystemClock;
6use futures::Future;
7use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
8use once_cell::sync::Lazy;
9use parking_lot::Mutex;
10use release_channel::ReleaseChannel;
11use settings::{Settings, SettingsStore};
12use sha2::{Digest, Sha256};
13use std::io::Write;
14use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
15use sysinfo::{CpuRefreshKind, MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};
16use telemetry_events::{
17 ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CopilotEvent, CpuEvent,
18 EditEvent, EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent,
19 SettingEvent,
20};
21use tempfile::NamedTempFile;
22use util::http::{self, HttpClient, HttpClientWithUrl, Method};
23#[cfg(not(debug_assertions))]
24use util::ResultExt;
25use util::TryFutureExt;
26
27use self::event_coalescer::EventCoalescer;
28
29pub struct Telemetry {
30 clock: Arc<dyn SystemClock>,
31 http_client: Arc<HttpClientWithUrl>,
32 executor: BackgroundExecutor,
33 state: Arc<Mutex<TelemetryState>>,
34}
35
36struct TelemetryState {
37 settings: TelemetrySettings,
38 metrics_id: Option<Arc<str>>, // Per logged-in user
39 installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
40 session_id: Option<String>, // Per app launch
41 release_channel: Option<&'static str>,
42 app_metadata: AppMetadata,
43 architecture: &'static str,
44 events_queue: Vec<EventWrapper>,
45 flush_events_task: Option<Task<()>>,
46 log_file: Option<NamedTempFile>,
47 is_staff: Option<bool>,
48 first_event_date_time: Option<DateTime<Utc>>,
49 event_coalescer: EventCoalescer,
50 max_queue_size: usize,
51}
52
53#[cfg(debug_assertions)]
54const MAX_QUEUE_LEN: usize = 5;
55
56#[cfg(not(debug_assertions))]
57const MAX_QUEUE_LEN: usize = 50;
58
59#[cfg(debug_assertions)]
60const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
61
62#[cfg(not(debug_assertions))]
63const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
64static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
65 option_env!("ZED_CLIENT_CHECKSUM_SEED")
66 .map(|s| s.as_bytes().into())
67 .or_else(|| {
68 env::var("ZED_CLIENT_CHECKSUM_SEED")
69 .ok()
70 .map(|s| s.as_bytes().into())
71 })
72});
73
74impl Telemetry {
75 pub fn new(
76 clock: Arc<dyn SystemClock>,
77 client: Arc<HttpClientWithUrl>,
78 cx: &mut AppContext,
79 ) -> Arc<Self> {
80 let release_channel =
81 ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
82
83 TelemetrySettings::register(cx);
84
85 let state = Arc::new(Mutex::new(TelemetryState {
86 settings: *TelemetrySettings::get_global(cx),
87 app_metadata: cx.app_metadata(),
88 architecture: env::consts::ARCH,
89 release_channel,
90 installation_id: None,
91 metrics_id: None,
92 session_id: None,
93 events_queue: Vec::new(),
94 flush_events_task: None,
95 log_file: None,
96 is_staff: None,
97 first_event_date_time: None,
98 event_coalescer: EventCoalescer::new(clock.clone()),
99 max_queue_size: MAX_QUEUE_LEN,
100 }));
101
102 #[cfg(not(debug_assertions))]
103 cx.background_executor()
104 .spawn({
105 let state = state.clone();
106 async move {
107 if let Some(tempfile) =
108 NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
109 {
110 state.lock().log_file = Some(tempfile);
111 }
112 }
113 })
114 .detach();
115
116 cx.observe_global::<SettingsStore>({
117 let state = state.clone();
118
119 move |cx| {
120 let mut state = state.lock();
121 state.settings = *TelemetrySettings::get_global(cx);
122 }
123 })
124 .detach();
125
126 // TODO: Replace all hardware stuff with nested SystemSpecs json
127 let this = Arc::new(Self {
128 clock,
129 http_client: client,
130 executor: cx.background_executor().clone(),
131 state,
132 });
133
134 // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
135 // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
136 std::mem::forget(cx.on_app_quit({
137 let this = this.clone();
138 move |_| this.shutdown_telemetry()
139 }));
140
141 this
142 }
143
144 #[cfg(any(test, feature = "test-support"))]
145 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
146 Task::ready(())
147 }
148
149 // Skip calling this function in tests.
150 // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
151 #[cfg(not(any(test, feature = "test-support")))]
152 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
153 self.report_app_event("close".to_string());
154 // TODO: close final edit period and make sure it's sent
155 Task::ready(())
156 }
157
158 pub fn log_file_path(&self) -> Option<PathBuf> {
159 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
160 }
161
162 pub fn start(
163 self: &Arc<Self>,
164 installation_id: Option<String>,
165 session_id: String,
166 cx: &mut AppContext,
167 ) {
168 let mut state = self.state.lock();
169 state.installation_id = installation_id.map(|id| id.into());
170 state.session_id = Some(session_id);
171 drop(state);
172
173 let this = self.clone();
174 cx.spawn(|_| async move {
175 // Avoiding calling `System::new_all()`, as there have been crashes related to it
176 let refresh_kind = RefreshKind::new()
177 .with_memory(MemoryRefreshKind::everything()) // For memory usage
178 .with_processes(ProcessRefreshKind::everything()) // For process usage
179 .with_cpu(CpuRefreshKind::everything()); // For core count
180
181 let mut system = System::new_with_specifics(refresh_kind);
182
183 // Avoiding calling `refresh_all()`, just update what we need
184 system.refresh_specifics(refresh_kind);
185
186 // Waiting some amount of time before the first query is important to get a reasonable value
187 // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
188 const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
189
190 loop {
191 smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
192
193 system.refresh_specifics(refresh_kind);
194
195 let current_process = Pid::from_u32(std::process::id());
196 let Some(process) = system.processes().get(¤t_process) else {
197 let process = current_process;
198 log::error!("Failed to find own process {process:?} in system process table");
199 // TODO: Fire an error telemetry event
200 return;
201 };
202
203 this.report_memory_event(process.memory(), process.virtual_memory());
204 this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
205 }
206 })
207 .detach();
208 }
209
210 pub fn set_authenticated_user_info(
211 self: &Arc<Self>,
212 metrics_id: Option<String>,
213 is_staff: bool,
214 ) {
215 let mut state = self.state.lock();
216
217 if !state.settings.metrics {
218 return;
219 }
220
221 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
222 state.metrics_id = metrics_id.clone();
223 state.is_staff = Some(is_staff);
224 drop(state);
225 }
226
227 pub fn report_editor_event(
228 self: &Arc<Self>,
229 file_extension: Option<String>,
230 vim_mode: bool,
231 operation: &'static str,
232 copilot_enabled: bool,
233 copilot_enabled_for_language: bool,
234 ) {
235 let event = Event::Editor(EditorEvent {
236 file_extension,
237 vim_mode,
238 operation: operation.into(),
239 copilot_enabled,
240 copilot_enabled_for_language,
241 });
242
243 self.report_event(event)
244 }
245
246 pub fn report_copilot_event(
247 self: &Arc<Self>,
248 suggestion_id: Option<String>,
249 suggestion_accepted: bool,
250 file_extension: Option<String>,
251 ) {
252 let event = Event::Copilot(CopilotEvent {
253 suggestion_id,
254 suggestion_accepted,
255 file_extension,
256 });
257
258 self.report_event(event)
259 }
260
261 pub fn report_assistant_event(
262 self: &Arc<Self>,
263 conversation_id: Option<String>,
264 kind: AssistantKind,
265 model: String,
266 ) {
267 let event = Event::Assistant(AssistantEvent {
268 conversation_id,
269 kind,
270 model: model.to_string(),
271 });
272
273 self.report_event(event)
274 }
275
276 pub fn report_call_event(
277 self: &Arc<Self>,
278 operation: &'static str,
279 room_id: Option<u64>,
280 channel_id: Option<ChannelId>,
281 ) {
282 let event = Event::Call(CallEvent {
283 operation: operation.to_string(),
284 room_id,
285 channel_id: channel_id.map(|cid| cid.0),
286 });
287
288 self.report_event(event)
289 }
290
291 pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
292 let event = Event::Cpu(CpuEvent {
293 usage_as_percentage,
294 core_count,
295 });
296
297 self.report_event(event)
298 }
299
300 pub fn report_memory_event(
301 self: &Arc<Self>,
302 memory_in_bytes: u64,
303 virtual_memory_in_bytes: u64,
304 ) {
305 let event = Event::Memory(MemoryEvent {
306 memory_in_bytes,
307 virtual_memory_in_bytes,
308 });
309
310 self.report_event(event)
311 }
312
313 pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
314 let event = Event::App(AppEvent { operation });
315
316 self.report_event(event.clone());
317
318 event
319 }
320
321 pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
322 let event = Event::Setting(SettingEvent {
323 setting: setting.to_string(),
324 value,
325 });
326
327 self.report_event(event)
328 }
329
330 pub fn report_extension_event(self: &Arc<Self>, extension_id: Arc<str>, version: Arc<str>) {
331 self.report_event(Event::Extension(ExtensionEvent {
332 extension_id,
333 version,
334 }))
335 }
336
337 pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
338 let mut state = self.state.lock();
339 let period_data = state.event_coalescer.log_event(environment);
340 drop(state);
341
342 if let Some((start, end, environment)) = period_data {
343 let event = Event::Edit(EditEvent {
344 duration: end.timestamp_millis() - start.timestamp_millis(),
345 environment: environment.to_string(),
346 });
347
348 self.report_event(event);
349 }
350 }
351
352 pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
353 let event = Event::Action(ActionEvent {
354 source: source.to_string(),
355 action,
356 });
357
358 self.report_event(event)
359 }
360
361 fn report_event(self: &Arc<Self>, event: Event) {
362 let mut state = self.state.lock();
363
364 if !state.settings.metrics {
365 return;
366 }
367
368 if state.flush_events_task.is_none() {
369 let this = self.clone();
370 let executor = self.executor.clone();
371 state.flush_events_task = Some(self.executor.spawn(async move {
372 executor.timer(FLUSH_INTERVAL).await;
373 this.flush_events();
374 }));
375 }
376
377 let date_time = self.clock.utc_now();
378
379 let milliseconds_since_first_event = match state.first_event_date_time {
380 Some(first_event_date_time) => {
381 date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
382 }
383 None => {
384 state.first_event_date_time = Some(date_time);
385 0
386 }
387 };
388
389 let signed_in = state.metrics_id.is_some();
390 state.events_queue.push(EventWrapper {
391 signed_in,
392 milliseconds_since_first_event,
393 event,
394 });
395
396 if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
397 drop(state);
398 self.flush_events();
399 }
400 }
401
402 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
403 self.state.lock().metrics_id.clone()
404 }
405
406 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
407 self.state.lock().installation_id.clone()
408 }
409
410 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
411 self.state.lock().is_staff
412 }
413
414 pub fn flush_events(self: &Arc<Self>) {
415 let mut state = self.state.lock();
416 state.first_event_date_time = None;
417 let mut events = mem::take(&mut state.events_queue);
418 state.flush_events_task.take();
419 drop(state);
420 if events.is_empty() {
421 return;
422 }
423
424 let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
425 return;
426 };
427
428 let this = self.clone();
429 self.executor
430 .spawn(
431 async move {
432 let mut json_bytes = Vec::new();
433
434 if let Some(file) = &mut this.state.lock().log_file {
435 let file = file.as_file_mut();
436 for event in &mut events {
437 json_bytes.clear();
438 serde_json::to_writer(&mut json_bytes, event)?;
439 file.write_all(&json_bytes)?;
440 file.write_all(b"\n")?;
441 }
442 }
443
444 {
445 let state = this.state.lock();
446 let request_body = EventRequestBody {
447 installation_id: state.installation_id.as_deref().map(Into::into),
448 session_id: state.session_id.clone(),
449 is_staff: state.is_staff,
450 app_version: state
451 .app_metadata
452 .app_version
453 .unwrap_or_default()
454 .to_string(),
455 os_name: state.app_metadata.os_name.to_string(),
456 os_version: state
457 .app_metadata
458 .os_version
459 .map(|version| version.to_string()),
460 architecture: state.architecture.to_string(),
461
462 release_channel: state.release_channel.map(Into::into),
463 events,
464 };
465 json_bytes.clear();
466 serde_json::to_writer(&mut json_bytes, &request_body)?;
467 }
468
469 let mut summer = Sha256::new();
470 summer.update(checksum_seed);
471 summer.update(&json_bytes);
472 summer.update(checksum_seed);
473 let mut checksum = String::new();
474 for byte in summer.finalize().as_slice() {
475 use std::fmt::Write;
476 write!(&mut checksum, "{:02x}", byte).unwrap();
477 }
478
479 let request = http::Request::builder()
480 .method(Method::POST)
481 .uri(
482 this.http_client
483 .build_zed_api_url("/telemetry/events", &[])?
484 .as_ref(),
485 )
486 .header("Content-Type", "text/plain")
487 .header("x-zed-checksum", checksum)
488 .body(json_bytes.into());
489
490 let response = this.http_client.send(request?).await?;
491 if response.status() != 200 {
492 log::error!("Failed to send events: HTTP {:?}", response.status());
493 }
494 anyhow::Ok(())
495 }
496 .log_err(),
497 )
498 .detach();
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use chrono::TimeZone;
506 use clock::FakeSystemClock;
507 use gpui::TestAppContext;
508 use util::http::FakeHttpClient;
509
510 #[gpui::test]
511 fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
512 init_test(cx);
513 let clock = Arc::new(FakeSystemClock::new(
514 Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
515 ));
516 let http = FakeHttpClient::with_200_response();
517 let installation_id = Some("installation_id".to_string());
518 let session_id = "session_id".to_string();
519
520 cx.update(|cx| {
521 let telemetry = Telemetry::new(clock.clone(), http, cx);
522
523 telemetry.state.lock().max_queue_size = 4;
524 telemetry.start(installation_id, session_id, cx);
525
526 assert!(is_empty_state(&telemetry));
527
528 let first_date_time = clock.utc_now();
529 let operation = "test".to_string();
530
531 let event = telemetry.report_app_event(operation.clone());
532 assert_eq!(
533 event,
534 Event::App(AppEvent {
535 operation: operation.clone(),
536 })
537 );
538 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
539 assert!(telemetry.state.lock().flush_events_task.is_some());
540 assert_eq!(
541 telemetry.state.lock().first_event_date_time,
542 Some(first_date_time)
543 );
544
545 clock.advance(chrono::Duration::milliseconds(100));
546
547 let event = telemetry.report_app_event(operation.clone());
548 assert_eq!(
549 event,
550 Event::App(AppEvent {
551 operation: operation.clone(),
552 })
553 );
554 assert_eq!(telemetry.state.lock().events_queue.len(), 2);
555 assert!(telemetry.state.lock().flush_events_task.is_some());
556 assert_eq!(
557 telemetry.state.lock().first_event_date_time,
558 Some(first_date_time)
559 );
560
561 clock.advance(chrono::Duration::milliseconds(100));
562
563 let event = telemetry.report_app_event(operation.clone());
564 assert_eq!(
565 event,
566 Event::App(AppEvent {
567 operation: operation.clone(),
568 })
569 );
570 assert_eq!(telemetry.state.lock().events_queue.len(), 3);
571 assert!(telemetry.state.lock().flush_events_task.is_some());
572 assert_eq!(
573 telemetry.state.lock().first_event_date_time,
574 Some(first_date_time)
575 );
576
577 clock.advance(chrono::Duration::milliseconds(100));
578
579 // Adding a 4th event should cause a flush
580 let event = telemetry.report_app_event(operation.clone());
581 assert_eq!(
582 event,
583 Event::App(AppEvent {
584 operation: operation.clone(),
585 })
586 );
587
588 assert!(is_empty_state(&telemetry));
589 });
590 }
591
592 #[gpui::test]
593 async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
594 init_test(cx);
595 let clock = Arc::new(FakeSystemClock::new(
596 Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
597 ));
598 let http = FakeHttpClient::with_200_response();
599 let installation_id = Some("installation_id".to_string());
600 let session_id = "session_id".to_string();
601
602 cx.update(|cx| {
603 let telemetry = Telemetry::new(clock.clone(), http, cx);
604 telemetry.state.lock().max_queue_size = 4;
605 telemetry.start(installation_id, session_id, cx);
606
607 assert!(is_empty_state(&telemetry));
608
609 let first_date_time = clock.utc_now();
610 let operation = "test".to_string();
611
612 let event = telemetry.report_app_event(operation.clone());
613 assert_eq!(
614 event,
615 Event::App(AppEvent {
616 operation: operation.clone(),
617 })
618 );
619 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
620 assert!(telemetry.state.lock().flush_events_task.is_some());
621 assert_eq!(
622 telemetry.state.lock().first_event_date_time,
623 Some(first_date_time)
624 );
625
626 let duration = Duration::from_millis(1);
627
628 // Test 1 millisecond before the flush interval limit is met
629 executor.advance_clock(FLUSH_INTERVAL - duration);
630
631 assert!(!is_empty_state(&telemetry));
632
633 // Test the exact moment the flush interval limit is met
634 executor.advance_clock(duration);
635
636 assert!(is_empty_state(&telemetry));
637 });
638 }
639
640 // TODO:
641 // Test settings
642 // Update FakeHTTPClient to keep track of the number of requests and assert on it
643
644 fn init_test(cx: &mut TestAppContext) {
645 cx.update(|cx| {
646 let settings_store = SettingsStore::test(cx);
647 cx.set_global(settings_store);
648 });
649 }
650
651 fn is_empty_state(telemetry: &Telemetry) -> bool {
652 telemetry.state.lock().events_queue.is_empty()
653 && telemetry.state.lock().flush_events_task.is_none()
654 && telemetry.state.lock().first_event_date_time.is_none()
655 }
656}