1mod event_coalescer;
2
3use crate::{ChannelId, TelemetrySettings};
4use chrono::{DateTime, Utc};
5use clock::SystemClock;
6use futures::Future;
7use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
8use once_cell::sync::Lazy;
9use parking_lot::Mutex;
10use release_channel::ReleaseChannel;
11use settings::{Settings, SettingsStore};
12use sha2::{Digest, Sha256};
13use std::io::Write;
14use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
15use sysinfo::{CpuRefreshKind, MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};
16use telemetry_events::{
17 ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CopilotEvent, CpuEvent,
18 EditEvent, EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
19};
20use tempfile::NamedTempFile;
21use util::http::{self, HttpClient, HttpClientWithUrl, Method};
22#[cfg(not(debug_assertions))]
23use util::ResultExt;
24use util::TryFutureExt;
25
26use self::event_coalescer::EventCoalescer;
27
28pub struct Telemetry {
29 clock: Arc<dyn SystemClock>,
30 http_client: Arc<HttpClientWithUrl>,
31 executor: BackgroundExecutor,
32 state: Arc<Mutex<TelemetryState>>,
33}
34
35struct TelemetryState {
36 settings: TelemetrySettings,
37 metrics_id: Option<Arc<str>>, // Per logged-in user
38 installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
39 session_id: Option<String>, // Per app launch
40 release_channel: Option<&'static str>,
41 app_metadata: AppMetadata,
42 architecture: &'static str,
43 events_queue: Vec<EventWrapper>,
44 flush_events_task: Option<Task<()>>,
45 log_file: Option<NamedTempFile>,
46 is_staff: Option<bool>,
47 first_event_date_time: Option<DateTime<Utc>>,
48 event_coalescer: EventCoalescer,
49 max_queue_size: usize,
50}
51
52#[cfg(debug_assertions)]
53const MAX_QUEUE_LEN: usize = 5;
54
55#[cfg(not(debug_assertions))]
56const MAX_QUEUE_LEN: usize = 50;
57
58#[cfg(debug_assertions)]
59const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
60
61#[cfg(not(debug_assertions))]
62const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
63static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
64 option_env!("ZED_CLIENT_CHECKSUM_SEED")
65 .map(|s| s.as_bytes().into())
66 .or_else(|| {
67 env::var("ZED_CLIENT_CHECKSUM_SEED")
68 .ok()
69 .map(|s| s.as_bytes().into())
70 })
71});
72
73impl Telemetry {
74 pub fn new(
75 clock: Arc<dyn SystemClock>,
76 client: Arc<HttpClientWithUrl>,
77 cx: &mut AppContext,
78 ) -> Arc<Self> {
79 let release_channel =
80 ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
81
82 TelemetrySettings::register(cx);
83
84 let state = Arc::new(Mutex::new(TelemetryState {
85 settings: *TelemetrySettings::get_global(cx),
86 app_metadata: cx.app_metadata(),
87 architecture: env::consts::ARCH,
88 release_channel,
89 installation_id: None,
90 metrics_id: None,
91 session_id: None,
92 events_queue: Vec::new(),
93 flush_events_task: None,
94 log_file: None,
95 is_staff: None,
96 first_event_date_time: None,
97 event_coalescer: EventCoalescer::new(clock.clone()),
98 max_queue_size: MAX_QUEUE_LEN,
99 }));
100
101 #[cfg(not(debug_assertions))]
102 cx.background_executor()
103 .spawn({
104 let state = state.clone();
105 async move {
106 if let Some(tempfile) =
107 NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
108 {
109 state.lock().log_file = Some(tempfile);
110 }
111 }
112 })
113 .detach();
114
115 cx.observe_global::<SettingsStore>({
116 let state = state.clone();
117
118 move |cx| {
119 let mut state = state.lock();
120 state.settings = *TelemetrySettings::get_global(cx);
121 }
122 })
123 .detach();
124
125 // TODO: Replace all hardware stuff with nested SystemSpecs json
126 let this = Arc::new(Self {
127 clock,
128 http_client: client,
129 executor: cx.background_executor().clone(),
130 state,
131 });
132
133 // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
134 // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
135 std::mem::forget(cx.on_app_quit({
136 let this = this.clone();
137 move |_| this.shutdown_telemetry()
138 }));
139
140 this
141 }
142
143 #[cfg(any(test, feature = "test-support"))]
144 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
145 Task::ready(())
146 }
147
148 // Skip calling this function in tests.
149 // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
150 #[cfg(not(any(test, feature = "test-support")))]
151 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
152 self.report_app_event("close".to_string());
153 // TODO: close final edit period and make sure it's sent
154 Task::ready(())
155 }
156
157 pub fn log_file_path(&self) -> Option<PathBuf> {
158 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
159 }
160
161 pub fn start(
162 self: &Arc<Self>,
163 installation_id: Option<String>,
164 session_id: String,
165 cx: &mut AppContext,
166 ) {
167 let mut state = self.state.lock();
168 state.installation_id = installation_id.map(|id| id.into());
169 state.session_id = Some(session_id);
170 drop(state);
171
172 let this = self.clone();
173 cx.spawn(|_| async move {
174 // Avoiding calling `System::new_all()`, as there have been crashes related to it
175 let refresh_kind = RefreshKind::new()
176 .with_memory(MemoryRefreshKind::everything()) // For memory usage
177 .with_processes(ProcessRefreshKind::everything()) // For process usage
178 .with_cpu(CpuRefreshKind::everything()); // For core count
179
180 let mut system = System::new_with_specifics(refresh_kind);
181
182 // Avoiding calling `refresh_all()`, just update what we need
183 system.refresh_specifics(refresh_kind);
184
185 // Waiting some amount of time before the first query is important to get a reasonable value
186 // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
187 const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
188
189 loop {
190 smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
191
192 system.refresh_specifics(refresh_kind);
193
194 let current_process = Pid::from_u32(std::process::id());
195 let Some(process) = system.processes().get(¤t_process) else {
196 let process = current_process;
197 log::error!("Failed to find own process {process:?} in system process table");
198 // TODO: Fire an error telemetry event
199 return;
200 };
201
202 this.report_memory_event(process.memory(), process.virtual_memory());
203 this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
204 }
205 })
206 .detach();
207 }
208
209 pub fn set_authenticated_user_info(
210 self: &Arc<Self>,
211 metrics_id: Option<String>,
212 is_staff: bool,
213 ) {
214 let mut state = self.state.lock();
215
216 if !state.settings.metrics {
217 return;
218 }
219
220 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
221 state.metrics_id = metrics_id.clone();
222 state.is_staff = Some(is_staff);
223 drop(state);
224 }
225
226 pub fn report_editor_event(
227 self: &Arc<Self>,
228 file_extension: Option<String>,
229 vim_mode: bool,
230 operation: &'static str,
231 copilot_enabled: bool,
232 copilot_enabled_for_language: bool,
233 ) {
234 let event = Event::Editor(EditorEvent {
235 file_extension,
236 vim_mode,
237 operation: operation.into(),
238 copilot_enabled,
239 copilot_enabled_for_language,
240 });
241
242 self.report_event(event)
243 }
244
245 pub fn report_copilot_event(
246 self: &Arc<Self>,
247 suggestion_id: Option<String>,
248 suggestion_accepted: bool,
249 file_extension: Option<String>,
250 ) {
251 let event = Event::Copilot(CopilotEvent {
252 suggestion_id,
253 suggestion_accepted,
254 file_extension,
255 });
256
257 self.report_event(event)
258 }
259
260 pub fn report_assistant_event(
261 self: &Arc<Self>,
262 conversation_id: Option<String>,
263 kind: AssistantKind,
264 model: String,
265 ) {
266 let event = Event::Assistant(AssistantEvent {
267 conversation_id,
268 kind,
269 model: model.to_string(),
270 });
271
272 self.report_event(event)
273 }
274
275 pub fn report_call_event(
276 self: &Arc<Self>,
277 operation: &'static str,
278 room_id: Option<u64>,
279 channel_id: Option<ChannelId>,
280 ) {
281 let event = Event::Call(CallEvent {
282 operation: operation.to_string(),
283 room_id,
284 channel_id: channel_id.map(|cid| cid.0),
285 });
286
287 self.report_event(event)
288 }
289
290 pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
291 let event = Event::Cpu(CpuEvent {
292 usage_as_percentage,
293 core_count,
294 });
295
296 self.report_event(event)
297 }
298
299 pub fn report_memory_event(
300 self: &Arc<Self>,
301 memory_in_bytes: u64,
302 virtual_memory_in_bytes: u64,
303 ) {
304 let event = Event::Memory(MemoryEvent {
305 memory_in_bytes,
306 virtual_memory_in_bytes,
307 });
308
309 self.report_event(event)
310 }
311
312 pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
313 let event = Event::App(AppEvent { operation });
314
315 self.report_event(event.clone());
316
317 event
318 }
319
320 pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
321 let event = Event::Setting(SettingEvent {
322 setting: setting.to_string(),
323 value,
324 });
325
326 self.report_event(event)
327 }
328
329 pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
330 let mut state = self.state.lock();
331 let period_data = state.event_coalescer.log_event(environment);
332 drop(state);
333
334 if let Some((start, end, environment)) = period_data {
335 let event = Event::Edit(EditEvent {
336 duration: end.timestamp_millis() - start.timestamp_millis(),
337 environment: environment.to_string(),
338 });
339
340 self.report_event(event);
341 }
342 }
343
344 pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
345 let event = Event::Action(ActionEvent {
346 source: source.to_string(),
347 action,
348 });
349
350 self.report_event(event)
351 }
352
353 fn report_event(self: &Arc<Self>, event: Event) {
354 let mut state = self.state.lock();
355
356 if !state.settings.metrics {
357 return;
358 }
359
360 if state.flush_events_task.is_none() {
361 let this = self.clone();
362 let executor = self.executor.clone();
363 state.flush_events_task = Some(self.executor.spawn(async move {
364 executor.timer(FLUSH_INTERVAL).await;
365 this.flush_events();
366 }));
367 }
368
369 let date_time = self.clock.utc_now();
370
371 let milliseconds_since_first_event = match state.first_event_date_time {
372 Some(first_event_date_time) => {
373 date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
374 }
375 None => {
376 state.first_event_date_time = Some(date_time);
377 0
378 }
379 };
380
381 let signed_in = state.metrics_id.is_some();
382 state.events_queue.push(EventWrapper {
383 signed_in,
384 milliseconds_since_first_event,
385 event,
386 });
387
388 if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
389 drop(state);
390 self.flush_events();
391 }
392 }
393
394 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
395 self.state.lock().metrics_id.clone()
396 }
397
398 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
399 self.state.lock().installation_id.clone()
400 }
401
402 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
403 self.state.lock().is_staff
404 }
405
406 pub fn flush_events(self: &Arc<Self>) {
407 let mut state = self.state.lock();
408 state.first_event_date_time = None;
409 let mut events = mem::take(&mut state.events_queue);
410 state.flush_events_task.take();
411 drop(state);
412 if events.is_empty() {
413 return;
414 }
415
416 let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
417 return;
418 };
419
420 let this = self.clone();
421 self.executor
422 .spawn(
423 async move {
424 let mut json_bytes = Vec::new();
425
426 if let Some(file) = &mut this.state.lock().log_file {
427 let file = file.as_file_mut();
428 for event in &mut events {
429 json_bytes.clear();
430 serde_json::to_writer(&mut json_bytes, event)?;
431 file.write_all(&json_bytes)?;
432 file.write_all(b"\n")?;
433 }
434 }
435
436 {
437 let state = this.state.lock();
438 let request_body = EventRequestBody {
439 installation_id: state.installation_id.as_deref().map(Into::into),
440 session_id: state.session_id.clone(),
441 is_staff: state.is_staff,
442 app_version: state
443 .app_metadata
444 .app_version
445 .unwrap_or_default()
446 .to_string(),
447 os_name: state.app_metadata.os_name.to_string(),
448 os_version: state
449 .app_metadata
450 .os_version
451 .map(|version| version.to_string()),
452 architecture: state.architecture.to_string(),
453
454 release_channel: state.release_channel.map(Into::into),
455 events,
456 };
457 json_bytes.clear();
458 serde_json::to_writer(&mut json_bytes, &request_body)?;
459 }
460
461 let mut summer = Sha256::new();
462 summer.update(checksum_seed);
463 summer.update(&json_bytes);
464 summer.update(checksum_seed);
465 let mut checksum = String::new();
466 for byte in summer.finalize().as_slice() {
467 use std::fmt::Write;
468 write!(&mut checksum, "{:02x}", byte).unwrap();
469 }
470
471 let request = http::Request::builder()
472 .method(Method::POST)
473 .uri(
474 this.http_client
475 .build_zed_api_url("/telemetry/events", &[])?
476 .as_ref(),
477 )
478 .header("Content-Type", "text/plain")
479 .header("x-zed-checksum", checksum)
480 .body(json_bytes.into());
481
482 let response = this.http_client.send(request?).await?;
483 if response.status() != 200 {
484 log::error!("Failed to send events: HTTP {:?}", response.status());
485 }
486 anyhow::Ok(())
487 }
488 .log_err(),
489 )
490 .detach();
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use chrono::TimeZone;
498 use clock::FakeSystemClock;
499 use gpui::TestAppContext;
500 use util::http::FakeHttpClient;
501
502 #[gpui::test]
503 fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
504 init_test(cx);
505 let clock = Arc::new(FakeSystemClock::new(
506 Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
507 ));
508 let http = FakeHttpClient::with_200_response();
509 let installation_id = Some("installation_id".to_string());
510 let session_id = "session_id".to_string();
511
512 cx.update(|cx| {
513 let telemetry = Telemetry::new(clock.clone(), http, cx);
514
515 telemetry.state.lock().max_queue_size = 4;
516 telemetry.start(installation_id, session_id, cx);
517
518 assert!(is_empty_state(&telemetry));
519
520 let first_date_time = clock.utc_now();
521 let operation = "test".to_string();
522
523 let event = telemetry.report_app_event(operation.clone());
524 assert_eq!(
525 event,
526 Event::App(AppEvent {
527 operation: operation.clone(),
528 })
529 );
530 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
531 assert!(telemetry.state.lock().flush_events_task.is_some());
532 assert_eq!(
533 telemetry.state.lock().first_event_date_time,
534 Some(first_date_time)
535 );
536
537 clock.advance(chrono::Duration::milliseconds(100));
538
539 let event = telemetry.report_app_event(operation.clone());
540 assert_eq!(
541 event,
542 Event::App(AppEvent {
543 operation: operation.clone(),
544 })
545 );
546 assert_eq!(telemetry.state.lock().events_queue.len(), 2);
547 assert!(telemetry.state.lock().flush_events_task.is_some());
548 assert_eq!(
549 telemetry.state.lock().first_event_date_time,
550 Some(first_date_time)
551 );
552
553 clock.advance(chrono::Duration::milliseconds(100));
554
555 let event = telemetry.report_app_event(operation.clone());
556 assert_eq!(
557 event,
558 Event::App(AppEvent {
559 operation: operation.clone(),
560 })
561 );
562 assert_eq!(telemetry.state.lock().events_queue.len(), 3);
563 assert!(telemetry.state.lock().flush_events_task.is_some());
564 assert_eq!(
565 telemetry.state.lock().first_event_date_time,
566 Some(first_date_time)
567 );
568
569 clock.advance(chrono::Duration::milliseconds(100));
570
571 // Adding a 4th event should cause a flush
572 let event = telemetry.report_app_event(operation.clone());
573 assert_eq!(
574 event,
575 Event::App(AppEvent {
576 operation: operation.clone(),
577 })
578 );
579
580 assert!(is_empty_state(&telemetry));
581 });
582 }
583
584 #[gpui::test]
585 async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
586 init_test(cx);
587 let clock = Arc::new(FakeSystemClock::new(
588 Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
589 ));
590 let http = FakeHttpClient::with_200_response();
591 let installation_id = Some("installation_id".to_string());
592 let session_id = "session_id".to_string();
593
594 cx.update(|cx| {
595 let telemetry = Telemetry::new(clock.clone(), http, cx);
596 telemetry.state.lock().max_queue_size = 4;
597 telemetry.start(installation_id, session_id, cx);
598
599 assert!(is_empty_state(&telemetry));
600
601 let first_date_time = clock.utc_now();
602 let operation = "test".to_string();
603
604 let event = telemetry.report_app_event(operation.clone());
605 assert_eq!(
606 event,
607 Event::App(AppEvent {
608 operation: operation.clone(),
609 })
610 );
611 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
612 assert!(telemetry.state.lock().flush_events_task.is_some());
613 assert_eq!(
614 telemetry.state.lock().first_event_date_time,
615 Some(first_date_time)
616 );
617
618 let duration = Duration::from_millis(1);
619
620 // Test 1 millisecond before the flush interval limit is met
621 executor.advance_clock(FLUSH_INTERVAL - duration);
622
623 assert!(!is_empty_state(&telemetry));
624
625 // Test the exact moment the flush interval limit is met
626 executor.advance_clock(duration);
627
628 assert!(is_empty_state(&telemetry));
629 });
630 }
631
632 // TODO:
633 // Test settings
634 // Update FakeHTTPClient to keep track of the number of requests and assert on it
635
636 fn init_test(cx: &mut TestAppContext) {
637 cx.update(|cx| {
638 let settings_store = SettingsStore::test(cx);
639 cx.set_global(settings_store);
640 });
641 }
642
643 fn is_empty_state(telemetry: &Telemetry) -> bool {
644 telemetry.state.lock().events_queue.is_empty()
645 && telemetry.state.lock().flush_events_task.is_none()
646 && telemetry.state.lock().first_event_date_time.is_none()
647 }
648}