1mod event_coalescer;
2
3use crate::{ChannelId, TelemetrySettings};
4use chrono::{DateTime, Utc};
5use clock::SystemClock;
6use futures::Future;
7use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
8use http::{self, HttpClient, HttpClientWithUrl, Method};
9use once_cell::sync::Lazy;
10use parking_lot::Mutex;
11use release_channel::ReleaseChannel;
12use settings::{Settings, SettingsStore};
13use sha2::{Digest, Sha256};
14use std::io::Write;
15use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
16use sysinfo::{CpuRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};
17use telemetry_events::{
18 ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CpuEvent, EditEvent,
19 EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent,
20 MemoryEvent, SettingEvent,
21};
22use tempfile::NamedTempFile;
23#[cfg(not(debug_assertions))]
24use util::ResultExt;
25use util::TryFutureExt;
26
27use self::event_coalescer::EventCoalescer;
28
29pub struct Telemetry {
30 clock: Arc<dyn SystemClock>,
31 http_client: Arc<HttpClientWithUrl>,
32 executor: BackgroundExecutor,
33 state: Arc<Mutex<TelemetryState>>,
34}
35
36struct TelemetryState {
37 settings: TelemetrySettings,
38 metrics_id: Option<Arc<str>>, // Per logged-in user
39 installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
40 session_id: Option<String>, // Per app launch
41 release_channel: Option<&'static str>,
42 app_metadata: AppMetadata,
43 architecture: &'static str,
44 events_queue: Vec<EventWrapper>,
45 flush_events_task: Option<Task<()>>,
46 log_file: Option<NamedTempFile>,
47 is_staff: Option<bool>,
48 first_event_date_time: Option<DateTime<Utc>>,
49 event_coalescer: EventCoalescer,
50 max_queue_size: usize,
51}
52
53#[cfg(debug_assertions)]
54const MAX_QUEUE_LEN: usize = 5;
55
56#[cfg(not(debug_assertions))]
57const MAX_QUEUE_LEN: usize = 50;
58
59#[cfg(debug_assertions)]
60const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
61
62#[cfg(not(debug_assertions))]
63const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
64static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
65 option_env!("ZED_CLIENT_CHECKSUM_SEED")
66 .map(|s| s.as_bytes().into())
67 .or_else(|| {
68 env::var("ZED_CLIENT_CHECKSUM_SEED")
69 .ok()
70 .map(|s| s.as_bytes().into())
71 })
72});
73
74impl Telemetry {
75 pub fn new(
76 clock: Arc<dyn SystemClock>,
77 client: Arc<HttpClientWithUrl>,
78 cx: &mut AppContext,
79 ) -> Arc<Self> {
80 let release_channel =
81 ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
82
83 TelemetrySettings::register(cx);
84
85 let state = Arc::new(Mutex::new(TelemetryState {
86 settings: *TelemetrySettings::get_global(cx),
87 app_metadata: cx.app_metadata(),
88 architecture: env::consts::ARCH,
89 release_channel,
90 installation_id: None,
91 metrics_id: None,
92 session_id: None,
93 events_queue: Vec::new(),
94 flush_events_task: None,
95 log_file: None,
96 is_staff: None,
97 first_event_date_time: None,
98 event_coalescer: EventCoalescer::new(clock.clone()),
99 max_queue_size: MAX_QUEUE_LEN,
100 }));
101
102 #[cfg(not(debug_assertions))]
103 cx.background_executor()
104 .spawn({
105 let state = state.clone();
106 async move {
107 if let Some(tempfile) =
108 NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
109 {
110 state.lock().log_file = Some(tempfile);
111 }
112 }
113 })
114 .detach();
115
116 cx.observe_global::<SettingsStore>({
117 let state = state.clone();
118
119 move |cx| {
120 let mut state = state.lock();
121 state.settings = *TelemetrySettings::get_global(cx);
122 }
123 })
124 .detach();
125
126 // TODO: Replace all hardware stuff with nested SystemSpecs json
127 let this = Arc::new(Self {
128 clock,
129 http_client: client,
130 executor: cx.background_executor().clone(),
131 state,
132 });
133
134 // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
135 // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
136 std::mem::forget(cx.on_app_quit({
137 let this = this.clone();
138 move |_| this.shutdown_telemetry()
139 }));
140
141 this
142 }
143
144 #[cfg(any(test, feature = "test-support"))]
145 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
146 Task::ready(())
147 }
148
149 // Skip calling this function in tests.
150 // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
151 #[cfg(not(any(test, feature = "test-support")))]
152 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
153 self.report_app_event("close".to_string());
154 // TODO: close final edit period and make sure it's sent
155 Task::ready(())
156 }
157
158 pub fn log_file_path(&self) -> Option<PathBuf> {
159 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
160 }
161
162 pub fn start(
163 self: &Arc<Self>,
164 installation_id: Option<String>,
165 session_id: String,
166 cx: &mut AppContext,
167 ) {
168 let mut state = self.state.lock();
169 state.installation_id = installation_id.map(|id| id.into());
170 state.session_id = Some(session_id);
171 drop(state);
172
173 let this = self.clone();
174 cx.background_executor()
175 .spawn(async move {
176 let mut system = System::new_with_specifics(
177 RefreshKind::new().with_cpu(CpuRefreshKind::everything()),
178 );
179
180 let refresh_kind = ProcessRefreshKind::new().with_cpu().with_memory();
181 let current_process = Pid::from_u32(std::process::id());
182 system.refresh_process_specifics(current_process, refresh_kind);
183
184 // Waiting some amount of time before the first query is important to get a reasonable value
185 // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
186 const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
187
188 loop {
189 smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
190
191 let current_process = Pid::from_u32(std::process::id());
192 system.refresh_process_specifics(current_process, refresh_kind);
193 let Some(process) = system.process(current_process) else {
194 log::error!(
195 "Failed to find own process {current_process:?} in system process table"
196 );
197 // TODO: Fire an error telemetry event
198 return;
199 };
200
201 this.report_memory_event(process.memory(), process.virtual_memory());
202 this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
203 }
204 })
205 .detach();
206 }
207
208 pub fn set_authenticated_user_info(
209 self: &Arc<Self>,
210 metrics_id: Option<String>,
211 is_staff: bool,
212 ) {
213 let mut state = self.state.lock();
214
215 if !state.settings.metrics {
216 return;
217 }
218
219 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
220 state.metrics_id.clone_from(&metrics_id);
221 state.is_staff = Some(is_staff);
222 drop(state);
223 }
224
225 pub fn report_editor_event(
226 self: &Arc<Self>,
227 file_extension: Option<String>,
228 vim_mode: bool,
229 operation: &'static str,
230 copilot_enabled: bool,
231 copilot_enabled_for_language: bool,
232 ) {
233 let event = Event::Editor(EditorEvent {
234 file_extension,
235 vim_mode,
236 operation: operation.into(),
237 copilot_enabled,
238 copilot_enabled_for_language,
239 });
240
241 self.report_event(event)
242 }
243
244 pub fn report_inline_completion_event(
245 self: &Arc<Self>,
246 provider: String,
247 suggestion_accepted: bool,
248 file_extension: Option<String>,
249 ) {
250 let event = Event::InlineCompletion(InlineCompletionEvent {
251 provider,
252 suggestion_accepted,
253 file_extension,
254 });
255
256 self.report_event(event)
257 }
258
259 pub fn report_assistant_event(
260 self: &Arc<Self>,
261 conversation_id: Option<String>,
262 kind: AssistantKind,
263 model: String,
264 response_latency: Option<Duration>,
265 error_message: Option<String>,
266 ) {
267 let event = Event::Assistant(AssistantEvent {
268 conversation_id,
269 kind,
270 model: model.to_string(),
271 response_latency,
272 error_message,
273 });
274
275 self.report_event(event)
276 }
277
278 pub fn report_call_event(
279 self: &Arc<Self>,
280 operation: &'static str,
281 room_id: Option<u64>,
282 channel_id: Option<ChannelId>,
283 ) {
284 let event = Event::Call(CallEvent {
285 operation: operation.to_string(),
286 room_id,
287 channel_id: channel_id.map(|cid| cid.0),
288 });
289
290 self.report_event(event)
291 }
292
293 pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
294 let event = Event::Cpu(CpuEvent {
295 usage_as_percentage,
296 core_count,
297 });
298
299 self.report_event(event)
300 }
301
302 pub fn report_memory_event(
303 self: &Arc<Self>,
304 memory_in_bytes: u64,
305 virtual_memory_in_bytes: u64,
306 ) {
307 let event = Event::Memory(MemoryEvent {
308 memory_in_bytes,
309 virtual_memory_in_bytes,
310 });
311
312 self.report_event(event)
313 }
314
315 pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
316 let event = Event::App(AppEvent { operation });
317
318 self.report_event(event.clone());
319
320 event
321 }
322
323 pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
324 let event = Event::Setting(SettingEvent {
325 setting: setting.to_string(),
326 value,
327 });
328
329 self.report_event(event)
330 }
331
332 pub fn report_extension_event(self: &Arc<Self>, extension_id: Arc<str>, version: Arc<str>) {
333 self.report_event(Event::Extension(ExtensionEvent {
334 extension_id,
335 version,
336 }))
337 }
338
339 pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
340 let mut state = self.state.lock();
341 let period_data = state.event_coalescer.log_event(environment);
342 drop(state);
343
344 if let Some((start, end, environment)) = period_data {
345 let event = Event::Edit(EditEvent {
346 duration: end.timestamp_millis() - start.timestamp_millis(),
347 environment: environment.to_string(),
348 });
349
350 self.report_event(event);
351 }
352 }
353
354 pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
355 let event = Event::Action(ActionEvent {
356 source: source.to_string(),
357 action,
358 });
359
360 self.report_event(event)
361 }
362
363 fn report_event(self: &Arc<Self>, event: Event) {
364 let mut state = self.state.lock();
365
366 if !state.settings.metrics {
367 return;
368 }
369
370 if state.flush_events_task.is_none() {
371 let this = self.clone();
372 let executor = self.executor.clone();
373 state.flush_events_task = Some(self.executor.spawn(async move {
374 executor.timer(FLUSH_INTERVAL).await;
375 this.flush_events();
376 }));
377 }
378
379 let date_time = self.clock.utc_now();
380
381 let milliseconds_since_first_event = match state.first_event_date_time {
382 Some(first_event_date_time) => {
383 date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
384 }
385 None => {
386 state.first_event_date_time = Some(date_time);
387 0
388 }
389 };
390
391 let signed_in = state.metrics_id.is_some();
392 state.events_queue.push(EventWrapper {
393 signed_in,
394 milliseconds_since_first_event,
395 event,
396 });
397
398 if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
399 drop(state);
400 self.flush_events();
401 }
402 }
403
404 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
405 self.state.lock().metrics_id.clone()
406 }
407
408 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
409 self.state.lock().installation_id.clone()
410 }
411
412 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
413 self.state.lock().is_staff
414 }
415
416 pub fn flush_events(self: &Arc<Self>) {
417 let mut state = self.state.lock();
418 state.first_event_date_time = None;
419 let mut events = mem::take(&mut state.events_queue);
420 state.flush_events_task.take();
421 drop(state);
422 if events.is_empty() {
423 return;
424 }
425
426 if ZED_CLIENT_CHECKSUM_SEED.is_none() {
427 return;
428 };
429
430 let this = self.clone();
431 self.executor
432 .spawn(
433 async move {
434 let mut json_bytes = Vec::new();
435
436 if let Some(file) = &mut this.state.lock().log_file {
437 let file = file.as_file_mut();
438 for event in &mut events {
439 json_bytes.clear();
440 serde_json::to_writer(&mut json_bytes, event)?;
441 file.write_all(&json_bytes)?;
442 file.write_all(b"\n")?;
443 }
444 }
445
446 {
447 let state = this.state.lock();
448 let request_body = EventRequestBody {
449 installation_id: state.installation_id.as_deref().map(Into::into),
450 session_id: state.session_id.clone(),
451 is_staff: state.is_staff,
452 app_version: state
453 .app_metadata
454 .app_version
455 .unwrap_or_default()
456 .to_string(),
457 os_name: state.app_metadata.os_name.to_string(),
458 os_version: state
459 .app_metadata
460 .os_version
461 .map(|version| version.to_string()),
462 architecture: state.architecture.to_string(),
463
464 release_channel: state.release_channel.map(Into::into),
465 events,
466 };
467 json_bytes.clear();
468 serde_json::to_writer(&mut json_bytes, &request_body)?;
469 }
470
471 let Some(checksum) = calculate_json_checksum(&json_bytes) else {
472 return Ok(());
473 };
474
475 let request = http::Request::builder()
476 .method(Method::POST)
477 .uri(
478 this.http_client
479 .build_zed_api_url("/telemetry/events", &[])?
480 .as_ref(),
481 )
482 .header("Content-Type", "text/plain")
483 .header("x-zed-checksum", checksum)
484 .body(json_bytes.into());
485
486 let response = this.http_client.send(request?).await?;
487 if response.status() != 200 {
488 log::error!("Failed to send events: HTTP {:?}", response.status());
489 }
490 anyhow::Ok(())
491 }
492 .log_err(),
493 )
494 .detach();
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use chrono::TimeZone;
502 use clock::FakeSystemClock;
503 use gpui::TestAppContext;
504 use http::FakeHttpClient;
505
506 #[gpui::test]
507 fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
508 init_test(cx);
509 let clock = Arc::new(FakeSystemClock::new(
510 Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
511 ));
512 let http = FakeHttpClient::with_200_response();
513 let installation_id = Some("installation_id".to_string());
514 let session_id = "session_id".to_string();
515
516 cx.update(|cx| {
517 let telemetry = Telemetry::new(clock.clone(), http, cx);
518
519 telemetry.state.lock().max_queue_size = 4;
520 telemetry.start(installation_id, session_id, cx);
521
522 assert!(is_empty_state(&telemetry));
523
524 let first_date_time = clock.utc_now();
525 let operation = "test".to_string();
526
527 let event = telemetry.report_app_event(operation.clone());
528 assert_eq!(
529 event,
530 Event::App(AppEvent {
531 operation: operation.clone(),
532 })
533 );
534 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
535 assert!(telemetry.state.lock().flush_events_task.is_some());
536 assert_eq!(
537 telemetry.state.lock().first_event_date_time,
538 Some(first_date_time)
539 );
540
541 clock.advance(chrono::Duration::milliseconds(100));
542
543 let event = telemetry.report_app_event(operation.clone());
544 assert_eq!(
545 event,
546 Event::App(AppEvent {
547 operation: operation.clone(),
548 })
549 );
550 assert_eq!(telemetry.state.lock().events_queue.len(), 2);
551 assert!(telemetry.state.lock().flush_events_task.is_some());
552 assert_eq!(
553 telemetry.state.lock().first_event_date_time,
554 Some(first_date_time)
555 );
556
557 clock.advance(chrono::Duration::milliseconds(100));
558
559 let event = telemetry.report_app_event(operation.clone());
560 assert_eq!(
561 event,
562 Event::App(AppEvent {
563 operation: operation.clone(),
564 })
565 );
566 assert_eq!(telemetry.state.lock().events_queue.len(), 3);
567 assert!(telemetry.state.lock().flush_events_task.is_some());
568 assert_eq!(
569 telemetry.state.lock().first_event_date_time,
570 Some(first_date_time)
571 );
572
573 clock.advance(chrono::Duration::milliseconds(100));
574
575 // Adding a 4th event should cause a flush
576 let event = telemetry.report_app_event(operation.clone());
577 assert_eq!(
578 event,
579 Event::App(AppEvent {
580 operation: operation.clone(),
581 })
582 );
583
584 assert!(is_empty_state(&telemetry));
585 });
586 }
587
588 #[gpui::test]
589 async fn test_telemetry_flush_on_flush_interval(
590 executor: BackgroundExecutor,
591 cx: &mut TestAppContext,
592 ) {
593 init_test(cx);
594 let clock = Arc::new(FakeSystemClock::new(
595 Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
596 ));
597 let http = FakeHttpClient::with_200_response();
598 let installation_id = Some("installation_id".to_string());
599 let session_id = "session_id".to_string();
600
601 cx.update(|cx| {
602 let telemetry = Telemetry::new(clock.clone(), http, cx);
603 telemetry.state.lock().max_queue_size = 4;
604 telemetry.start(installation_id, session_id, cx);
605
606 assert!(is_empty_state(&telemetry));
607
608 let first_date_time = clock.utc_now();
609 let operation = "test".to_string();
610
611 let event = telemetry.report_app_event(operation.clone());
612 assert_eq!(
613 event,
614 Event::App(AppEvent {
615 operation: operation.clone(),
616 })
617 );
618 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
619 assert!(telemetry.state.lock().flush_events_task.is_some());
620 assert_eq!(
621 telemetry.state.lock().first_event_date_time,
622 Some(first_date_time)
623 );
624
625 let duration = Duration::from_millis(1);
626
627 // Test 1 millisecond before the flush interval limit is met
628 executor.advance_clock(FLUSH_INTERVAL - duration);
629
630 assert!(!is_empty_state(&telemetry));
631
632 // Test the exact moment the flush interval limit is met
633 executor.advance_clock(duration);
634
635 assert!(is_empty_state(&telemetry));
636 });
637 }
638
639 // TODO:
640 // Test settings
641 // Update FakeHTTPClient to keep track of the number of requests and assert on it
642
643 fn init_test(cx: &mut TestAppContext) {
644 cx.update(|cx| {
645 let settings_store = SettingsStore::test(cx);
646 cx.set_global(settings_store);
647 });
648 }
649
650 fn is_empty_state(telemetry: &Telemetry) -> bool {
651 telemetry.state.lock().events_queue.is_empty()
652 && telemetry.state.lock().flush_events_task.is_none()
653 && telemetry.state.lock().first_event_date_time.is_none()
654 }
655}
656
657pub fn calculate_json_checksum(json: &impl AsRef<[u8]>) -> Option<String> {
658 let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
659 return None;
660 };
661
662 let mut summer = Sha256::new();
663 summer.update(checksum_seed);
664 summer.update(&json);
665 summer.update(checksum_seed);
666 let mut checksum = String::new();
667 for byte in summer.finalize().as_slice() {
668 use std::fmt::Write;
669 write!(&mut checksum, "{:02x}", byte).unwrap();
670 }
671
672 Some(checksum)
673}