1mod event_coalescer;
2
3use crate::TelemetrySettings;
4use chrono::{DateTime, Utc};
5use clock::SystemClock;
6use futures::Future;
7use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
8use once_cell::sync::Lazy;
9use parking_lot::Mutex;
10use release_channel::ReleaseChannel;
11use serde::Serialize;
12use settings::{Settings, SettingsStore};
13use sha2::{Digest, Sha256};
14use std::io::Write;
15use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
16use sysinfo::{
17 CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
18};
19use tempfile::NamedTempFile;
20use util::http::{self, HttpClient, Method, ZedHttpClient};
21#[cfg(not(debug_assertions))]
22use util::ResultExt;
23use util::TryFutureExt;
24
25use self::event_coalescer::EventCoalescer;
26
27pub struct Telemetry {
28 clock: Arc<dyn SystemClock>,
29 http_client: Arc<ZedHttpClient>,
30 executor: BackgroundExecutor,
31 state: Arc<Mutex<TelemetryState>>,
32}
33
34struct TelemetryState {
35 settings: TelemetrySettings,
36 metrics_id: Option<Arc<str>>, // Per logged-in user
37 installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
38 session_id: Option<Arc<str>>, // Per app launch
39 release_channel: Option<&'static str>,
40 app_metadata: AppMetadata,
41 architecture: &'static str,
42 events_queue: Vec<EventWrapper>,
43 flush_events_task: Option<Task<()>>,
44 log_file: Option<NamedTempFile>,
45 is_staff: Option<bool>,
46 first_event_date_time: Option<DateTime<Utc>>,
47 event_coalescer: EventCoalescer,
48 max_queue_size: usize,
49}
50
51#[derive(Serialize, Debug)]
52struct EventRequestBody {
53 installation_id: Option<Arc<str>>,
54 session_id: Option<Arc<str>>,
55 is_staff: Option<bool>,
56 app_version: Option<String>,
57 os_name: &'static str,
58 os_version: Option<String>,
59 architecture: &'static str,
60 release_channel: Option<&'static str>,
61 events: Vec<EventWrapper>,
62}
63
64#[derive(Serialize, Debug)]
65struct EventWrapper {
66 signed_in: bool,
67 #[serde(flatten)]
68 event: Event,
69}
70
71#[derive(Clone, Debug, PartialEq, Serialize)]
72#[serde(rename_all = "snake_case")]
73pub enum AssistantKind {
74 Panel,
75 Inline,
76}
77
78#[derive(Clone, Debug, PartialEq, Serialize)]
79#[serde(tag = "type")]
80pub enum Event {
81 Editor {
82 operation: &'static str,
83 file_extension: Option<String>,
84 vim_mode: bool,
85 copilot_enabled: bool,
86 copilot_enabled_for_language: bool,
87 milliseconds_since_first_event: i64,
88 },
89 Copilot {
90 suggestion_id: Option<String>,
91 suggestion_accepted: bool,
92 file_extension: Option<String>,
93 milliseconds_since_first_event: i64,
94 },
95 Call {
96 operation: &'static str,
97 room_id: Option<u64>,
98 channel_id: Option<u64>,
99 milliseconds_since_first_event: i64,
100 },
101 Assistant {
102 conversation_id: Option<String>,
103 kind: AssistantKind,
104 model: &'static str,
105 milliseconds_since_first_event: i64,
106 },
107 Cpu {
108 usage_as_percentage: f32,
109 core_count: u32,
110 milliseconds_since_first_event: i64,
111 },
112 Memory {
113 memory_in_bytes: u64,
114 virtual_memory_in_bytes: u64,
115 milliseconds_since_first_event: i64,
116 },
117 App {
118 operation: String,
119 milliseconds_since_first_event: i64,
120 },
121 Setting {
122 setting: &'static str,
123 value: String,
124 milliseconds_since_first_event: i64,
125 },
126 Edit {
127 duration: i64,
128 environment: &'static str,
129 milliseconds_since_first_event: i64,
130 },
131 Action {
132 source: &'static str,
133 action: String,
134 milliseconds_since_first_event: i64,
135 },
136}
137
138#[cfg(debug_assertions)]
139const MAX_QUEUE_LEN: usize = 5;
140
141#[cfg(not(debug_assertions))]
142const MAX_QUEUE_LEN: usize = 50;
143
144#[cfg(debug_assertions)]
145const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
146
147#[cfg(not(debug_assertions))]
148const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
149
150static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
151 option_env!("ZED_CLIENT_CHECKSUM_SEED")
152 .map(|s| s.as_bytes().into())
153 .or_else(|| {
154 env::var("ZED_CLIENT_CHECKSUM_SEED")
155 .ok()
156 .map(|s| s.as_bytes().into())
157 })
158});
159
160impl Telemetry {
161 pub fn new(
162 clock: Arc<dyn SystemClock>,
163 client: Arc<ZedHttpClient>,
164 cx: &mut AppContext,
165 ) -> Arc<Self> {
166 let release_channel =
167 ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
168
169 TelemetrySettings::register(cx);
170
171 let state = Arc::new(Mutex::new(TelemetryState {
172 settings: TelemetrySettings::get_global(cx).clone(),
173 app_metadata: cx.app_metadata(),
174 architecture: env::consts::ARCH,
175 release_channel,
176 installation_id: None,
177 metrics_id: None,
178 session_id: None,
179 events_queue: Vec::new(),
180 flush_events_task: None,
181 log_file: None,
182 is_staff: None,
183 first_event_date_time: None,
184 event_coalescer: EventCoalescer::new(),
185 max_queue_size: MAX_QUEUE_LEN,
186 }));
187
188 #[cfg(not(debug_assertions))]
189 cx.background_executor()
190 .spawn({
191 let state = state.clone();
192 async move {
193 if let Some(tempfile) =
194 NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
195 {
196 state.lock().log_file = Some(tempfile);
197 }
198 }
199 })
200 .detach();
201
202 cx.observe_global::<SettingsStore>({
203 let state = state.clone();
204
205 move |cx| {
206 let mut state = state.lock();
207 state.settings = TelemetrySettings::get_global(cx).clone();
208 }
209 })
210 .detach();
211
212 // TODO: Replace all hardware stuff with nested SystemSpecs json
213 let this = Arc::new(Self {
214 clock,
215 http_client: client,
216 executor: cx.background_executor().clone(),
217 state,
218 });
219
220 // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
221 // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
222 std::mem::forget(cx.on_app_quit({
223 let this = this.clone();
224 move |_| this.shutdown_telemetry()
225 }));
226
227 this
228 }
229
230 #[cfg(any(test, feature = "test-support"))]
231 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
232 Task::ready(())
233 }
234
235 // Skip calling this function in tests.
236 // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
237 #[cfg(not(any(test, feature = "test-support")))]
238 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
239 self.report_app_event("close".to_string());
240 // TODO: close final edit period and make sure it's sent
241 Task::ready(())
242 }
243
244 pub fn log_file_path(&self) -> Option<PathBuf> {
245 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
246 }
247
248 pub fn start(
249 self: &Arc<Self>,
250 installation_id: Option<String>,
251 session_id: String,
252 cx: &mut AppContext,
253 ) {
254 let mut state = self.state.lock();
255 state.installation_id = installation_id.map(|id| id.into());
256 state.session_id = Some(session_id.into());
257 drop(state);
258
259 let this = self.clone();
260 cx.spawn(|_| async move {
261 // Avoiding calling `System::new_all()`, as there have been crashes related to it
262 let refresh_kind = RefreshKind::new()
263 .with_memory() // For memory usage
264 .with_processes(ProcessRefreshKind::everything()) // For process usage
265 .with_cpu(CpuRefreshKind::everything()); // For core count
266
267 let mut system = System::new_with_specifics(refresh_kind);
268
269 // Avoiding calling `refresh_all()`, just update what we need
270 system.refresh_specifics(refresh_kind);
271
272 // Waiting some amount of time before the first query is important to get a reasonable value
273 // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
274 const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
275
276 loop {
277 smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
278
279 system.refresh_specifics(refresh_kind);
280
281 let current_process = Pid::from_u32(std::process::id());
282 let Some(process) = system.processes().get(¤t_process) else {
283 let process = current_process;
284 log::error!("Failed to find own process {process:?} in system process table");
285 // TODO: Fire an error telemetry event
286 return;
287 };
288
289 this.report_memory_event(process.memory(), process.virtual_memory());
290 this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
291 }
292 })
293 .detach();
294 }
295
296 pub fn set_authenticated_user_info(
297 self: &Arc<Self>,
298 metrics_id: Option<String>,
299 is_staff: bool,
300 ) {
301 let mut state = self.state.lock();
302
303 if !state.settings.metrics {
304 return;
305 }
306
307 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
308 state.metrics_id = metrics_id.clone();
309 state.is_staff = Some(is_staff);
310 drop(state);
311 }
312
313 pub fn report_editor_event(
314 self: &Arc<Self>,
315 file_extension: Option<String>,
316 vim_mode: bool,
317 operation: &'static str,
318 copilot_enabled: bool,
319 copilot_enabled_for_language: bool,
320 ) {
321 let event = Event::Editor {
322 file_extension,
323 vim_mode,
324 operation,
325 copilot_enabled,
326 copilot_enabled_for_language,
327 milliseconds_since_first_event: self
328 .milliseconds_since_first_event(self.clock.utc_now()),
329 };
330
331 self.report_event(event)
332 }
333
334 pub fn report_copilot_event(
335 self: &Arc<Self>,
336 suggestion_id: Option<String>,
337 suggestion_accepted: bool,
338 file_extension: Option<String>,
339 ) {
340 let event = Event::Copilot {
341 suggestion_id,
342 suggestion_accepted,
343 file_extension,
344 milliseconds_since_first_event: self
345 .milliseconds_since_first_event(self.clock.utc_now()),
346 };
347
348 self.report_event(event)
349 }
350
351 pub fn report_assistant_event(
352 self: &Arc<Self>,
353 conversation_id: Option<String>,
354 kind: AssistantKind,
355 model: &'static str,
356 ) {
357 let event = Event::Assistant {
358 conversation_id,
359 kind,
360 model,
361 milliseconds_since_first_event: self
362 .milliseconds_since_first_event(self.clock.utc_now()),
363 };
364
365 self.report_event(event)
366 }
367
368 pub fn report_call_event(
369 self: &Arc<Self>,
370 operation: &'static str,
371 room_id: Option<u64>,
372 channel_id: Option<u64>,
373 ) {
374 let event = Event::Call {
375 operation,
376 room_id,
377 channel_id,
378 milliseconds_since_first_event: self
379 .milliseconds_since_first_event(self.clock.utc_now()),
380 };
381
382 self.report_event(event)
383 }
384
385 pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
386 let event = Event::Cpu {
387 usage_as_percentage,
388 core_count,
389 milliseconds_since_first_event: self
390 .milliseconds_since_first_event(self.clock.utc_now()),
391 };
392
393 self.report_event(event)
394 }
395
396 pub fn report_memory_event(
397 self: &Arc<Self>,
398 memory_in_bytes: u64,
399 virtual_memory_in_bytes: u64,
400 ) {
401 let event = Event::Memory {
402 memory_in_bytes,
403 virtual_memory_in_bytes,
404 milliseconds_since_first_event: self
405 .milliseconds_since_first_event(self.clock.utc_now()),
406 };
407
408 self.report_event(event)
409 }
410
411 pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
412 let event = Event::App {
413 operation,
414 milliseconds_since_first_event: self
415 .milliseconds_since_first_event(self.clock.utc_now()),
416 };
417
418 self.report_event(event.clone());
419
420 event
421 }
422
423 pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
424 let event = Event::Setting {
425 setting,
426 value,
427 milliseconds_since_first_event: self
428 .milliseconds_since_first_event(self.clock.utc_now()),
429 };
430
431 self.report_event(event)
432 }
433
434 pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
435 let mut state = self.state.lock();
436 let period_data = state.event_coalescer.log_event(environment);
437 drop(state);
438
439 if let Some((start, end, environment)) = period_data {
440 let event = Event::Edit {
441 duration: end.timestamp_millis() - start.timestamp_millis(),
442 environment,
443 milliseconds_since_first_event: self
444 .milliseconds_since_first_event(self.clock.utc_now()),
445 };
446
447 self.report_event(event);
448 }
449 }
450
451 pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
452 let event = Event::Action {
453 source,
454 action,
455 milliseconds_since_first_event: self
456 .milliseconds_since_first_event(self.clock.utc_now()),
457 };
458
459 self.report_event(event)
460 }
461
462 fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
463 let mut state = self.state.lock();
464
465 match state.first_event_date_time {
466 Some(first_event_date_time) => {
467 date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
468 }
469 None => {
470 state.first_event_date_time = Some(date_time);
471 0
472 }
473 }
474 }
475
476 fn report_event(self: &Arc<Self>, event: Event) {
477 let mut state = self.state.lock();
478
479 if !state.settings.metrics {
480 return;
481 }
482
483 if state.flush_events_task.is_none() {
484 let this = self.clone();
485 let executor = self.executor.clone();
486 state.flush_events_task = Some(self.executor.spawn(async move {
487 executor.timer(FLUSH_INTERVAL).await;
488 this.flush_events();
489 }));
490 }
491
492 let signed_in = state.metrics_id.is_some();
493 state.events_queue.push(EventWrapper { signed_in, event });
494
495 if state.installation_id.is_some() {
496 if state.events_queue.len() >= state.max_queue_size {
497 drop(state);
498 self.flush_events();
499 }
500 }
501 }
502
503 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
504 self.state.lock().metrics_id.clone()
505 }
506
507 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
508 self.state.lock().installation_id.clone()
509 }
510
511 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
512 self.state.lock().is_staff
513 }
514
515 pub fn flush_events(self: &Arc<Self>) {
516 let mut state = self.state.lock();
517 state.first_event_date_time = None;
518 let mut events = mem::take(&mut state.events_queue);
519 state.flush_events_task.take();
520 drop(state);
521 if events.is_empty() {
522 return;
523 }
524
525 let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
526 return;
527 };
528
529 let this = self.clone();
530 self.executor
531 .spawn(
532 async move {
533 let mut json_bytes = Vec::new();
534
535 if let Some(file) = &mut this.state.lock().log_file {
536 let file = file.as_file_mut();
537 for event in &mut events {
538 json_bytes.clear();
539 serde_json::to_writer(&mut json_bytes, event)?;
540 file.write_all(&json_bytes)?;
541 file.write(b"\n")?;
542 }
543 }
544
545 {
546 let state = this.state.lock();
547 let request_body = EventRequestBody {
548 installation_id: state.installation_id.clone(),
549 session_id: state.session_id.clone(),
550 is_staff: state.is_staff.clone(),
551 app_version: state
552 .app_metadata
553 .app_version
554 .map(|version| version.to_string()),
555 os_name: state.app_metadata.os_name,
556 os_version: state
557 .app_metadata
558 .os_version
559 .map(|version| version.to_string()),
560 architecture: state.architecture,
561
562 release_channel: state.release_channel,
563 events,
564 };
565 json_bytes.clear();
566 serde_json::to_writer(&mut json_bytes, &request_body)?;
567 }
568
569 let mut summer = Sha256::new();
570 summer.update(checksum_seed);
571 summer.update(&json_bytes);
572 summer.update(checksum_seed);
573 let mut checksum = String::new();
574 for byte in summer.finalize().as_slice() {
575 use std::fmt::Write;
576 write!(&mut checksum, "{:02x}", byte).unwrap();
577 }
578
579 let request = http::Request::builder()
580 .method(Method::POST)
581 .uri(&this.http_client.zed_url("/api/events"))
582 .header("Content-Type", "text/plain")
583 .header("x-zed-checksum", checksum)
584 .body(json_bytes.into());
585
586 let response = this.http_client.send(request?).await?;
587 if response.status() != 200 {
588 log::error!("Failed to send events: HTTP {:?}", response.status());
589 }
590 anyhow::Ok(())
591 }
592 .log_err(),
593 )
594 .detach();
595 }
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601 use chrono::TimeZone;
602 use clock::FakeSystemClock;
603 use gpui::TestAppContext;
604 use util::http::FakeHttpClient;
605
606 #[gpui::test]
607 fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
608 init_test(cx);
609 let clock = Arc::new(FakeSystemClock::new(
610 Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
611 ));
612 let http = FakeHttpClient::with_200_response();
613 let installation_id = Some("installation_id".to_string());
614 let session_id = "session_id".to_string();
615
616 cx.update(|cx| {
617 let telemetry = Telemetry::new(clock.clone(), http, cx);
618
619 telemetry.state.lock().max_queue_size = 4;
620 telemetry.start(installation_id, session_id, cx);
621
622 assert!(is_empty_state(&telemetry));
623
624 let first_date_time = clock.utc_now();
625 let operation = "test".to_string();
626
627 let event = telemetry.report_app_event(operation.clone());
628 assert_eq!(
629 event,
630 Event::App {
631 operation: operation.clone(),
632 milliseconds_since_first_event: 0
633 }
634 );
635 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
636 assert!(telemetry.state.lock().flush_events_task.is_some());
637 assert_eq!(
638 telemetry.state.lock().first_event_date_time,
639 Some(first_date_time)
640 );
641
642 clock.advance(chrono::Duration::milliseconds(100));
643
644 let event = telemetry.report_app_event(operation.clone());
645 assert_eq!(
646 event,
647 Event::App {
648 operation: operation.clone(),
649 milliseconds_since_first_event: 100
650 }
651 );
652 assert_eq!(telemetry.state.lock().events_queue.len(), 2);
653 assert!(telemetry.state.lock().flush_events_task.is_some());
654 assert_eq!(
655 telemetry.state.lock().first_event_date_time,
656 Some(first_date_time)
657 );
658
659 clock.advance(chrono::Duration::milliseconds(100));
660
661 let event = telemetry.report_app_event(operation.clone());
662 assert_eq!(
663 event,
664 Event::App {
665 operation: operation.clone(),
666 milliseconds_since_first_event: 200
667 }
668 );
669 assert_eq!(telemetry.state.lock().events_queue.len(), 3);
670 assert!(telemetry.state.lock().flush_events_task.is_some());
671 assert_eq!(
672 telemetry.state.lock().first_event_date_time,
673 Some(first_date_time)
674 );
675
676 clock.advance(chrono::Duration::milliseconds(100));
677
678 // Adding a 4th event should cause a flush
679 let event = telemetry.report_app_event(operation.clone());
680 assert_eq!(
681 event,
682 Event::App {
683 operation: operation.clone(),
684 milliseconds_since_first_event: 300
685 }
686 );
687
688 assert!(is_empty_state(&telemetry));
689 });
690 }
691
692 #[gpui::test]
693 async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
694 init_test(cx);
695 let clock = Arc::new(FakeSystemClock::new(
696 Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
697 ));
698 let http = FakeHttpClient::with_200_response();
699 let installation_id = Some("installation_id".to_string());
700 let session_id = "session_id".to_string();
701
702 cx.update(|cx| {
703 let telemetry = Telemetry::new(clock.clone(), http, cx);
704 telemetry.state.lock().max_queue_size = 4;
705 telemetry.start(installation_id, session_id, cx);
706
707 assert!(is_empty_state(&telemetry));
708
709 let first_date_time = clock.utc_now();
710 let operation = "test".to_string();
711
712 let event = telemetry.report_app_event(operation.clone());
713 assert_eq!(
714 event,
715 Event::App {
716 operation: operation.clone(),
717 milliseconds_since_first_event: 0
718 }
719 );
720 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
721 assert!(telemetry.state.lock().flush_events_task.is_some());
722 assert_eq!(
723 telemetry.state.lock().first_event_date_time,
724 Some(first_date_time)
725 );
726
727 let duration = Duration::from_millis(1);
728
729 // Test 1 millisecond before the flush interval limit is met
730 executor.advance_clock(FLUSH_INTERVAL - duration);
731
732 assert!(!is_empty_state(&telemetry));
733
734 // Test the exact moment the flush interval limit is met
735 executor.advance_clock(duration);
736
737 assert!(is_empty_state(&telemetry));
738 });
739 }
740
741 // TODO:
742 // Test settings
743 // Update FakeHTTPClient to keep track of the number of requests and assert on it
744
745 fn init_test(cx: &mut TestAppContext) {
746 cx.update(|cx| {
747 let settings_store = SettingsStore::test(cx);
748 cx.set_global(settings_store);
749 });
750 }
751
752 fn is_empty_state(telemetry: &Telemetry) -> bool {
753 telemetry.state.lock().events_queue.is_empty()
754 && telemetry.state.lock().flush_events_task.is_none()
755 && telemetry.state.lock().first_event_date_time.is_none()
756 }
757}