1mod event_coalescer;
2
3use crate::TelemetrySettings;
4use chrono::{DateTime, Utc};
5use futures::Future;
6use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
7use once_cell::sync::Lazy;
8use parking_lot::Mutex;
9use release_channel::ReleaseChannel;
10use serde::Serialize;
11use settings::{Settings, SettingsStore};
12use sha2::{Digest, Sha256};
13use std::io::Write;
14use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
15use sysinfo::{
16 CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
17};
18use tempfile::NamedTempFile;
19use util::http::{self, HttpClient, Method, ZedHttpClient};
20#[cfg(not(debug_assertions))]
21use util::ResultExt;
22use util::TryFutureExt;
23
24use self::event_coalescer::EventCoalescer;
25
26pub struct Telemetry {
27 http_client: Arc<ZedHttpClient>,
28 executor: BackgroundExecutor,
29 state: Arc<Mutex<TelemetryState>>,
30}
31
32struct TelemetryState {
33 settings: TelemetrySettings,
34 metrics_id: Option<Arc<str>>, // Per logged-in user
35 installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
36 session_id: Option<Arc<str>>, // Per app launch
37 release_channel: Option<&'static str>,
38 app_metadata: AppMetadata,
39 architecture: &'static str,
40 events_queue: Vec<EventWrapper>,
41 flush_events_task: Option<Task<()>>,
42 log_file: Option<NamedTempFile>,
43 is_staff: Option<bool>,
44 first_event_date_time: Option<DateTime<Utc>>,
45 event_coalescer: EventCoalescer,
46 max_queue_size: usize,
47}
48
49#[derive(Serialize, Debug)]
50struct EventRequestBody {
51 installation_id: Option<Arc<str>>,
52 session_id: Option<Arc<str>>,
53 is_staff: Option<bool>,
54 app_version: Option<String>,
55 os_name: &'static str,
56 os_version: Option<String>,
57 architecture: &'static str,
58 release_channel: Option<&'static str>,
59 events: Vec<EventWrapper>,
60}
61
62#[derive(Serialize, Debug)]
63struct EventWrapper {
64 signed_in: bool,
65 #[serde(flatten)]
66 event: Event,
67}
68
69#[derive(Clone, Debug, PartialEq, Serialize)]
70#[serde(rename_all = "snake_case")]
71pub enum AssistantKind {
72 Panel,
73 Inline,
74}
75
76#[derive(Clone, Debug, PartialEq, Serialize)]
77#[serde(tag = "type")]
78pub enum Event {
79 Editor {
80 operation: &'static str,
81 file_extension: Option<String>,
82 vim_mode: bool,
83 copilot_enabled: bool,
84 copilot_enabled_for_language: bool,
85 milliseconds_since_first_event: i64,
86 },
87 Copilot {
88 suggestion_id: Option<String>,
89 suggestion_accepted: bool,
90 file_extension: Option<String>,
91 milliseconds_since_first_event: i64,
92 },
93 Call {
94 operation: &'static str,
95 room_id: Option<u64>,
96 channel_id: Option<u64>,
97 milliseconds_since_first_event: i64,
98 },
99 Assistant {
100 conversation_id: Option<String>,
101 kind: AssistantKind,
102 model: &'static str,
103 milliseconds_since_first_event: i64,
104 },
105 Cpu {
106 usage_as_percentage: f32,
107 core_count: u32,
108 milliseconds_since_first_event: i64,
109 },
110 Memory {
111 memory_in_bytes: u64,
112 virtual_memory_in_bytes: u64,
113 milliseconds_since_first_event: i64,
114 },
115 App {
116 operation: String,
117 milliseconds_since_first_event: i64,
118 },
119 Setting {
120 setting: &'static str,
121 value: String,
122 milliseconds_since_first_event: i64,
123 },
124 Edit {
125 duration: i64,
126 environment: &'static str,
127 milliseconds_since_first_event: i64,
128 },
129 Action {
130 source: &'static str,
131 action: String,
132 milliseconds_since_first_event: i64,
133 },
134}
135
136#[cfg(debug_assertions)]
137const MAX_QUEUE_LEN: usize = 5;
138
139#[cfg(not(debug_assertions))]
140const MAX_QUEUE_LEN: usize = 50;
141
142#[cfg(debug_assertions)]
143const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
144
145#[cfg(not(debug_assertions))]
146const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
147
148static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
149 option_env!("ZED_CLIENT_CHECKSUM_SEED")
150 .map(|s| s.as_bytes().into())
151 .or_else(|| {
152 env::var("ZED_CLIENT_CHECKSUM_SEED")
153 .ok()
154 .map(|s| s.as_bytes().into())
155 })
156});
157
158impl Telemetry {
159 pub fn new(client: Arc<ZedHttpClient>, cx: &mut AppContext) -> Arc<Self> {
160 let release_channel =
161 ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
162
163 TelemetrySettings::register(cx);
164
165 let state = Arc::new(Mutex::new(TelemetryState {
166 settings: TelemetrySettings::get_global(cx).clone(),
167 app_metadata: cx.app_metadata(),
168 architecture: env::consts::ARCH,
169 release_channel,
170 installation_id: None,
171 metrics_id: None,
172 session_id: None,
173 events_queue: Vec::new(),
174 flush_events_task: None,
175 log_file: None,
176 is_staff: None,
177 first_event_date_time: None,
178 event_coalescer: EventCoalescer::new(),
179 max_queue_size: MAX_QUEUE_LEN,
180 }));
181
182 #[cfg(not(debug_assertions))]
183 cx.background_executor()
184 .spawn({
185 let state = state.clone();
186 async move {
187 if let Some(tempfile) =
188 NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
189 {
190 state.lock().log_file = Some(tempfile);
191 }
192 }
193 })
194 .detach();
195
196 cx.observe_global::<SettingsStore>({
197 let state = state.clone();
198
199 move |cx| {
200 let mut state = state.lock();
201 state.settings = TelemetrySettings::get_global(cx).clone();
202 }
203 })
204 .detach();
205
206 // TODO: Replace all hardware stuff with nested SystemSpecs json
207 let this = Arc::new(Self {
208 http_client: client,
209 executor: cx.background_executor().clone(),
210 state,
211 });
212
213 // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
214 // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
215 std::mem::forget(cx.on_app_quit({
216 let this = this.clone();
217 move |_| this.shutdown_telemetry()
218 }));
219
220 this
221 }
222
223 #[cfg(any(test, feature = "test-support"))]
224 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
225 Task::ready(())
226 }
227
228 // Skip calling this function in tests.
229 // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
230 #[cfg(not(any(test, feature = "test-support")))]
231 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
232 self.report_app_event("close".to_string());
233 // TODO: close final edit period and make sure it's sent
234 Task::ready(())
235 }
236
237 pub fn log_file_path(&self) -> Option<PathBuf> {
238 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
239 }
240
241 pub fn start(
242 self: &Arc<Self>,
243 installation_id: Option<String>,
244 session_id: String,
245 cx: &mut AppContext,
246 ) {
247 let mut state = self.state.lock();
248 state.installation_id = installation_id.map(|id| id.into());
249 state.session_id = Some(session_id.into());
250 drop(state);
251
252 let this = self.clone();
253 cx.spawn(|_| async move {
254 // Avoiding calling `System::new_all()`, as there have been crashes related to it
255 let refresh_kind = RefreshKind::new()
256 .with_memory() // For memory usage
257 .with_processes(ProcessRefreshKind::everything()) // For process usage
258 .with_cpu(CpuRefreshKind::everything()); // For core count
259
260 let mut system = System::new_with_specifics(refresh_kind);
261
262 // Avoiding calling `refresh_all()`, just update what we need
263 system.refresh_specifics(refresh_kind);
264
265 // Waiting some amount of time before the first query is important to get a reasonable value
266 // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
267 const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
268
269 loop {
270 smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
271
272 system.refresh_specifics(refresh_kind);
273
274 let current_process = Pid::from_u32(std::process::id());
275 let Some(process) = system.processes().get(¤t_process) else {
276 let process = current_process;
277 log::error!("Failed to find own process {process:?} in system process table");
278 // TODO: Fire an error telemetry event
279 return;
280 };
281
282 this.report_memory_event(process.memory(), process.virtual_memory());
283 this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
284 }
285 })
286 .detach();
287 }
288
289 pub fn set_authenticated_user_info(
290 self: &Arc<Self>,
291 metrics_id: Option<String>,
292 is_staff: bool,
293 ) {
294 let mut state = self.state.lock();
295
296 if !state.settings.metrics {
297 return;
298 }
299
300 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
301 state.metrics_id = metrics_id.clone();
302 state.is_staff = Some(is_staff);
303 drop(state);
304 }
305
306 pub fn report_editor_event(
307 self: &Arc<Self>,
308 file_extension: Option<String>,
309 vim_mode: bool,
310 operation: &'static str,
311 copilot_enabled: bool,
312 copilot_enabled_for_language: bool,
313 ) {
314 let event = Event::Editor {
315 file_extension,
316 vim_mode,
317 operation,
318 copilot_enabled,
319 copilot_enabled_for_language,
320 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
321 };
322
323 self.report_event(event)
324 }
325
326 pub fn report_copilot_event(
327 self: &Arc<Self>,
328 suggestion_id: Option<String>,
329 suggestion_accepted: bool,
330 file_extension: Option<String>,
331 ) {
332 let event = Event::Copilot {
333 suggestion_id,
334 suggestion_accepted,
335 file_extension,
336 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
337 };
338
339 self.report_event(event)
340 }
341
342 pub fn report_assistant_event(
343 self: &Arc<Self>,
344 conversation_id: Option<String>,
345 kind: AssistantKind,
346 model: &'static str,
347 ) {
348 let event = Event::Assistant {
349 conversation_id,
350 kind,
351 model,
352 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
353 };
354
355 self.report_event(event)
356 }
357
358 pub fn report_call_event(
359 self: &Arc<Self>,
360 operation: &'static str,
361 room_id: Option<u64>,
362 channel_id: Option<u64>,
363 ) {
364 let event = Event::Call {
365 operation,
366 room_id,
367 channel_id,
368 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
369 };
370
371 self.report_event(event)
372 }
373
374 pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
375 let event = Event::Cpu {
376 usage_as_percentage,
377 core_count,
378 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
379 };
380
381 self.report_event(event)
382 }
383
384 pub fn report_memory_event(
385 self: &Arc<Self>,
386 memory_in_bytes: u64,
387 virtual_memory_in_bytes: u64,
388 ) {
389 let event = Event::Memory {
390 memory_in_bytes,
391 virtual_memory_in_bytes,
392 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
393 };
394
395 self.report_event(event)
396 }
397
398 pub fn report_app_event(self: &Arc<Self>, operation: String) {
399 self.report_app_event_with_date_time(operation, Utc::now());
400 }
401
402 fn report_app_event_with_date_time(
403 self: &Arc<Self>,
404 operation: String,
405 date_time: DateTime<Utc>,
406 ) -> Event {
407 let event = Event::App {
408 operation,
409 milliseconds_since_first_event: self.milliseconds_since_first_event(date_time),
410 };
411
412 self.report_event(event.clone());
413
414 event
415 }
416
417 pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
418 let event = Event::Setting {
419 setting,
420 value,
421 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
422 };
423
424 self.report_event(event)
425 }
426
427 pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
428 let mut state = self.state.lock();
429 let period_data = state.event_coalescer.log_event(environment);
430 drop(state);
431
432 if let Some((start, end, environment)) = period_data {
433 let event = Event::Edit {
434 duration: end.timestamp_millis() - start.timestamp_millis(),
435 environment,
436 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
437 };
438
439 self.report_event(event);
440 }
441 }
442
443 pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
444 let event = Event::Action {
445 source,
446 action,
447 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
448 };
449
450 self.report_event(event)
451 }
452
453 fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
454 let mut state = self.state.lock();
455
456 match state.first_event_date_time {
457 Some(first_event_date_time) => {
458 date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
459 }
460 None => {
461 state.first_event_date_time = Some(date_time);
462 0
463 }
464 }
465 }
466
467 fn report_event(self: &Arc<Self>, event: Event) {
468 let mut state = self.state.lock();
469
470 if !state.settings.metrics {
471 return;
472 }
473
474 if state.flush_events_task.is_none() {
475 let this = self.clone();
476 let executor = self.executor.clone();
477 state.flush_events_task = Some(self.executor.spawn(async move {
478 executor.timer(FLUSH_INTERVAL).await;
479 this.flush_events();
480 }));
481 }
482
483 let signed_in = state.metrics_id.is_some();
484 state.events_queue.push(EventWrapper { signed_in, event });
485
486 if state.installation_id.is_some() {
487 if state.events_queue.len() >= state.max_queue_size {
488 drop(state);
489 self.flush_events();
490 }
491 }
492 }
493
494 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
495 self.state.lock().metrics_id.clone()
496 }
497
498 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
499 self.state.lock().installation_id.clone()
500 }
501
502 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
503 self.state.lock().is_staff
504 }
505
506 pub fn flush_events(self: &Arc<Self>) {
507 let mut state = self.state.lock();
508 state.first_event_date_time = None;
509 let mut events = mem::take(&mut state.events_queue);
510 state.flush_events_task.take();
511 drop(state);
512 if events.is_empty() {
513 return;
514 }
515
516 let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
517 return;
518 };
519
520 let this = self.clone();
521 self.executor
522 .spawn(
523 async move {
524 let mut json_bytes = Vec::new();
525
526 if let Some(file) = &mut this.state.lock().log_file {
527 let file = file.as_file_mut();
528 for event in &mut events {
529 json_bytes.clear();
530 serde_json::to_writer(&mut json_bytes, event)?;
531 file.write_all(&json_bytes)?;
532 file.write(b"\n")?;
533 }
534 }
535
536 {
537 let state = this.state.lock();
538 let request_body = EventRequestBody {
539 installation_id: state.installation_id.clone(),
540 session_id: state.session_id.clone(),
541 is_staff: state.is_staff.clone(),
542 app_version: state
543 .app_metadata
544 .app_version
545 .map(|version| version.to_string()),
546 os_name: state.app_metadata.os_name,
547 os_version: state
548 .app_metadata
549 .os_version
550 .map(|version| version.to_string()),
551 architecture: state.architecture,
552
553 release_channel: state.release_channel,
554 events,
555 };
556 json_bytes.clear();
557 serde_json::to_writer(&mut json_bytes, &request_body)?;
558 }
559
560 let mut summer = Sha256::new();
561 summer.update(checksum_seed);
562 summer.update(&json_bytes);
563 summer.update(checksum_seed);
564 let mut checksum = String::new();
565 for byte in summer.finalize().as_slice() {
566 use std::fmt::Write;
567 write!(&mut checksum, "{:02x}", byte).unwrap();
568 }
569
570 let request = http::Request::builder()
571 .method(Method::POST)
572 .uri(&this.http_client.zed_url("/api/events"))
573 .header("Content-Type", "text/plain")
574 .header("x-zed-checksum", checksum)
575 .body(json_bytes.into());
576
577 let response = this.http_client.send(request?).await?;
578 if response.status() != 200 {
579 log::error!("Failed to send events: HTTP {:?}", response.status());
580 }
581 anyhow::Ok(())
582 }
583 .log_err(),
584 )
585 .detach();
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592 use chrono::TimeZone;
593 use gpui::TestAppContext;
594 use util::http::FakeHttpClient;
595
596 #[gpui::test]
597 fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
598 init_test(cx);
599 let http = FakeHttpClient::with_200_response();
600 let installation_id = Some("installation_id".to_string());
601 let session_id = "session_id".to_string();
602
603 cx.update(|cx| {
604 let telemetry = Telemetry::new(http, cx);
605
606 telemetry.state.lock().max_queue_size = 4;
607 telemetry.start(installation_id, session_id, cx);
608
609 assert!(is_empty_state(&telemetry));
610
611 let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
612 let operation = "test".to_string();
613
614 let event =
615 telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
616 assert_eq!(
617 event,
618 Event::App {
619 operation: operation.clone(),
620 milliseconds_since_first_event: 0
621 }
622 );
623 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
624 assert!(telemetry.state.lock().flush_events_task.is_some());
625 assert_eq!(
626 telemetry.state.lock().first_event_date_time,
627 Some(first_date_time)
628 );
629
630 let mut date_time = first_date_time + chrono::Duration::milliseconds(100);
631
632 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
633 assert_eq!(
634 event,
635 Event::App {
636 operation: operation.clone(),
637 milliseconds_since_first_event: 100
638 }
639 );
640 assert_eq!(telemetry.state.lock().events_queue.len(), 2);
641 assert!(telemetry.state.lock().flush_events_task.is_some());
642 assert_eq!(
643 telemetry.state.lock().first_event_date_time,
644 Some(first_date_time)
645 );
646
647 date_time += chrono::Duration::milliseconds(100);
648
649 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
650 assert_eq!(
651 event,
652 Event::App {
653 operation: operation.clone(),
654 milliseconds_since_first_event: 200
655 }
656 );
657 assert_eq!(telemetry.state.lock().events_queue.len(), 3);
658 assert!(telemetry.state.lock().flush_events_task.is_some());
659 assert_eq!(
660 telemetry.state.lock().first_event_date_time,
661 Some(first_date_time)
662 );
663
664 date_time += chrono::Duration::milliseconds(100);
665
666 // Adding a 4th event should cause a flush
667 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
668 assert_eq!(
669 event,
670 Event::App {
671 operation: operation.clone(),
672 milliseconds_since_first_event: 300
673 }
674 );
675
676 assert!(is_empty_state(&telemetry));
677 });
678 }
679
680 #[gpui::test]
681 async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
682 init_test(cx);
683 let http = FakeHttpClient::with_200_response();
684 let installation_id = Some("installation_id".to_string());
685 let session_id = "session_id".to_string();
686
687 cx.update(|cx| {
688 let telemetry = Telemetry::new(http, cx);
689 telemetry.state.lock().max_queue_size = 4;
690 telemetry.start(installation_id, session_id, cx);
691
692 assert!(is_empty_state(&telemetry));
693
694 let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
695 let operation = "test".to_string();
696
697 let event =
698 telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
699 assert_eq!(
700 event,
701 Event::App {
702 operation: operation.clone(),
703 milliseconds_since_first_event: 0
704 }
705 );
706 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
707 assert!(telemetry.state.lock().flush_events_task.is_some());
708 assert_eq!(
709 telemetry.state.lock().first_event_date_time,
710 Some(first_date_time)
711 );
712
713 let duration = Duration::from_millis(1);
714
715 // Test 1 millisecond before the flush interval limit is met
716 executor.advance_clock(FLUSH_INTERVAL - duration);
717
718 assert!(!is_empty_state(&telemetry));
719
720 // Test the exact moment the flush interval limit is met
721 executor.advance_clock(duration);
722
723 assert!(is_empty_state(&telemetry));
724 });
725 }
726
727 // TODO:
728 // Test settings
729 // Update FakeHTTPClient to keep track of the number of requests and assert on it
730
731 fn init_test(cx: &mut TestAppContext) {
732 cx.update(|cx| {
733 let settings_store = SettingsStore::test(cx);
734 cx.set_global(settings_store);
735 });
736 }
737
738 fn is_empty_state(telemetry: &Telemetry) -> bool {
739 telemetry.state.lock().events_queue.is_empty()
740 && telemetry.state.lock().flush_events_task.is_none()
741 && telemetry.state.lock().first_event_date_time.is_none()
742 }
743}