1mod event_coalescer;
2
3use crate::TelemetrySettings;
4use chrono::{DateTime, Utc};
5use futures::Future;
6use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
7use parking_lot::Mutex;
8use serde::Serialize;
9use settings::{Settings, SettingsStore};
10use std::{env, io::Write, mem, path::PathBuf, sync::Arc, time::Duration};
11use sysinfo::{
12 CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
13};
14use tempfile::NamedTempFile;
15use util::http::{HttpClient, ZedHttpClient};
16#[cfg(not(debug_assertions))]
17use util::ResultExt;
18use util::{channel::ReleaseChannel, TryFutureExt};
19
20use self::event_coalescer::EventCoalescer;
21
22pub struct Telemetry {
23 http_client: Arc<ZedHttpClient>,
24 executor: BackgroundExecutor,
25 state: Arc<Mutex<TelemetryState>>,
26}
27
28struct TelemetryState {
29 settings: TelemetrySettings,
30 metrics_id: Option<Arc<str>>, // Per logged-in user
31 installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
32 session_id: Option<Arc<str>>, // Per app launch
33 release_channel: Option<&'static str>,
34 app_metadata: AppMetadata,
35 architecture: &'static str,
36 events_queue: Vec<EventWrapper>,
37 flush_events_task: Option<Task<()>>,
38 log_file: Option<NamedTempFile>,
39 is_staff: Option<bool>,
40 first_event_date_time: Option<DateTime<Utc>>,
41 event_coalescer: EventCoalescer,
42 max_queue_size: usize,
43}
44
45#[derive(Serialize, Debug)]
46struct EventRequestBody {
47 installation_id: Option<Arc<str>>,
48 session_id: Option<Arc<str>>,
49 is_staff: Option<bool>,
50 app_version: Option<String>,
51 os_name: &'static str,
52 os_version: Option<String>,
53 architecture: &'static str,
54 release_channel: Option<&'static str>,
55 events: Vec<EventWrapper>,
56}
57
58#[derive(Serialize, Debug)]
59struct EventWrapper {
60 signed_in: bool,
61 #[serde(flatten)]
62 event: Event,
63}
64
65#[derive(Clone, Debug, PartialEq, Serialize)]
66#[serde(rename_all = "snake_case")]
67pub enum AssistantKind {
68 Panel,
69 Inline,
70}
71
72#[derive(Clone, Debug, PartialEq, Serialize)]
73#[serde(tag = "type")]
74pub enum Event {
75 Editor {
76 operation: &'static str,
77 file_extension: Option<String>,
78 vim_mode: bool,
79 copilot_enabled: bool,
80 copilot_enabled_for_language: bool,
81 milliseconds_since_first_event: i64,
82 },
83 Copilot {
84 suggestion_id: Option<String>,
85 suggestion_accepted: bool,
86 file_extension: Option<String>,
87 milliseconds_since_first_event: i64,
88 },
89 Call {
90 operation: &'static str,
91 room_id: Option<u64>,
92 channel_id: Option<u64>,
93 milliseconds_since_first_event: i64,
94 },
95 Assistant {
96 conversation_id: Option<String>,
97 kind: AssistantKind,
98 model: &'static str,
99 milliseconds_since_first_event: i64,
100 },
101 Cpu {
102 usage_as_percentage: f32,
103 core_count: u32,
104 milliseconds_since_first_event: i64,
105 },
106 Memory {
107 memory_in_bytes: u64,
108 virtual_memory_in_bytes: u64,
109 milliseconds_since_first_event: i64,
110 },
111 App {
112 operation: String,
113 milliseconds_since_first_event: i64,
114 },
115 Setting {
116 setting: &'static str,
117 value: String,
118 milliseconds_since_first_event: i64,
119 },
120 Edit {
121 duration: i64,
122 environment: &'static str,
123 milliseconds_since_first_event: i64,
124 },
125 Action {
126 source: &'static str,
127 action: String,
128 milliseconds_since_first_event: i64,
129 },
130}
131
132#[cfg(debug_assertions)]
133const MAX_QUEUE_LEN: usize = 5;
134
135#[cfg(not(debug_assertions))]
136const MAX_QUEUE_LEN: usize = 50;
137
138#[cfg(debug_assertions)]
139const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
140
141#[cfg(not(debug_assertions))]
142const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
143
144impl Telemetry {
145 pub fn new(client: Arc<ZedHttpClient>, cx: &mut AppContext) -> Arc<Self> {
146 let release_channel = cx
147 .try_global::<ReleaseChannel>()
148 .map(|release_channel| release_channel.display_name());
149
150 TelemetrySettings::register(cx);
151
152 let state = Arc::new(Mutex::new(TelemetryState {
153 settings: TelemetrySettings::get_global(cx).clone(),
154 app_metadata: cx.app_metadata(),
155 architecture: env::consts::ARCH,
156 release_channel,
157 installation_id: None,
158 metrics_id: None,
159 session_id: None,
160 events_queue: Vec::new(),
161 flush_events_task: None,
162 log_file: None,
163 is_staff: None,
164 first_event_date_time: None,
165 event_coalescer: EventCoalescer::new(),
166 max_queue_size: MAX_QUEUE_LEN,
167 }));
168
169 #[cfg(not(debug_assertions))]
170 cx.background_executor()
171 .spawn({
172 let state = state.clone();
173 async move {
174 if let Some(tempfile) =
175 NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
176 {
177 state.lock().log_file = Some(tempfile);
178 }
179 }
180 })
181 .detach();
182
183 cx.observe_global::<SettingsStore>({
184 let state = state.clone();
185
186 move |cx| {
187 let mut state = state.lock();
188 state.settings = TelemetrySettings::get_global(cx).clone();
189 }
190 })
191 .detach();
192
193 // TODO: Replace all hardware stuff with nested SystemSpecs json
194 let this = Arc::new(Self {
195 http_client: client,
196 executor: cx.background_executor().clone(),
197 state,
198 });
199
200 // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
201 // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
202 std::mem::forget(cx.on_app_quit({
203 let this = this.clone();
204 move |_| this.shutdown_telemetry()
205 }));
206
207 this
208 }
209
210 #[cfg(any(test, feature = "test-support"))]
211 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
212 Task::ready(())
213 }
214
215 // Skip calling this function in tests.
216 // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
217 #[cfg(not(any(test, feature = "test-support")))]
218 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
219 self.report_app_event("close".to_string());
220 // TODO: close final edit period and make sure it's sent
221 Task::ready(())
222 }
223
224 pub fn log_file_path(&self) -> Option<PathBuf> {
225 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
226 }
227
228 pub fn start(
229 self: &Arc<Self>,
230 installation_id: Option<String>,
231 session_id: String,
232 cx: &mut AppContext,
233 ) {
234 let mut state = self.state.lock();
235 state.installation_id = installation_id.map(|id| id.into());
236 state.session_id = Some(session_id.into());
237 drop(state);
238
239 let this = self.clone();
240 cx.spawn(|_| async move {
241 // Avoiding calling `System::new_all()`, as there have been crashes related to it
242 let refresh_kind = RefreshKind::new()
243 .with_memory() // For memory usage
244 .with_processes(ProcessRefreshKind::everything()) // For process usage
245 .with_cpu(CpuRefreshKind::everything()); // For core count
246
247 let mut system = System::new_with_specifics(refresh_kind);
248
249 // Avoiding calling `refresh_all()`, just update what we need
250 system.refresh_specifics(refresh_kind);
251
252 // Waiting some amount of time before the first query is important to get a reasonable value
253 // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
254 const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
255
256 loop {
257 smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
258
259 system.refresh_specifics(refresh_kind);
260
261 let current_process = Pid::from_u32(std::process::id());
262 let Some(process) = system.processes().get(¤t_process) else {
263 let process = current_process;
264 log::error!("Failed to find own process {process:?} in system process table");
265 // TODO: Fire an error telemetry event
266 return;
267 };
268
269 this.report_memory_event(process.memory(), process.virtual_memory());
270 this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
271 }
272 })
273 .detach();
274 }
275
276 pub fn set_authenticated_user_info(
277 self: &Arc<Self>,
278 metrics_id: Option<String>,
279 is_staff: bool,
280 ) {
281 let mut state = self.state.lock();
282
283 if !state.settings.metrics {
284 return;
285 }
286
287 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
288 state.metrics_id = metrics_id.clone();
289 state.is_staff = Some(is_staff);
290 drop(state);
291 }
292
293 pub fn report_editor_event(
294 self: &Arc<Self>,
295 file_extension: Option<String>,
296 vim_mode: bool,
297 operation: &'static str,
298 copilot_enabled: bool,
299 copilot_enabled_for_language: bool,
300 ) {
301 let event = Event::Editor {
302 file_extension,
303 vim_mode,
304 operation,
305 copilot_enabled,
306 copilot_enabled_for_language,
307 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
308 };
309
310 self.report_event(event)
311 }
312
313 pub fn report_copilot_event(
314 self: &Arc<Self>,
315 suggestion_id: Option<String>,
316 suggestion_accepted: bool,
317 file_extension: Option<String>,
318 ) {
319 let event = Event::Copilot {
320 suggestion_id,
321 suggestion_accepted,
322 file_extension,
323 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
324 };
325
326 self.report_event(event)
327 }
328
329 pub fn report_assistant_event(
330 self: &Arc<Self>,
331 conversation_id: Option<String>,
332 kind: AssistantKind,
333 model: &'static str,
334 ) {
335 let event = Event::Assistant {
336 conversation_id,
337 kind,
338 model,
339 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
340 };
341
342 self.report_event(event)
343 }
344
345 pub fn report_call_event(
346 self: &Arc<Self>,
347 operation: &'static str,
348 room_id: Option<u64>,
349 channel_id: Option<u64>,
350 ) {
351 let event = Event::Call {
352 operation,
353 room_id,
354 channel_id,
355 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
356 };
357
358 self.report_event(event)
359 }
360
361 pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
362 let event = Event::Cpu {
363 usage_as_percentage,
364 core_count,
365 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
366 };
367
368 self.report_event(event)
369 }
370
371 pub fn report_memory_event(
372 self: &Arc<Self>,
373 memory_in_bytes: u64,
374 virtual_memory_in_bytes: u64,
375 ) {
376 let event = Event::Memory {
377 memory_in_bytes,
378 virtual_memory_in_bytes,
379 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
380 };
381
382 self.report_event(event)
383 }
384
385 pub fn report_app_event(self: &Arc<Self>, operation: String) {
386 self.report_app_event_with_date_time(operation, Utc::now());
387 }
388
389 fn report_app_event_with_date_time(
390 self: &Arc<Self>,
391 operation: String,
392 date_time: DateTime<Utc>,
393 ) -> Event {
394 let event = Event::App {
395 operation,
396 milliseconds_since_first_event: self.milliseconds_since_first_event(date_time),
397 };
398
399 self.report_event(event.clone());
400
401 event
402 }
403
404 pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
405 let event = Event::Setting {
406 setting,
407 value,
408 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
409 };
410
411 self.report_event(event)
412 }
413
414 pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
415 let mut state = self.state.lock();
416 let period_data = state.event_coalescer.log_event(environment);
417 drop(state);
418
419 if let Some((start, end, environment)) = period_data {
420 let event = Event::Edit {
421 duration: end.timestamp_millis() - start.timestamp_millis(),
422 environment,
423 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
424 };
425
426 self.report_event(event);
427 }
428 }
429
430 pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
431 let event = Event::Action {
432 source,
433 action,
434 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
435 };
436
437 self.report_event(event)
438 }
439
440 fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
441 let mut state = self.state.lock();
442
443 match state.first_event_date_time {
444 Some(first_event_date_time) => {
445 date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
446 }
447 None => {
448 state.first_event_date_time = Some(date_time);
449 0
450 }
451 }
452 }
453
454 fn report_event(self: &Arc<Self>, event: Event) {
455 let mut state = self.state.lock();
456
457 if !state.settings.metrics {
458 return;
459 }
460
461 if state.flush_events_task.is_none() {
462 let this = self.clone();
463 let executor = self.executor.clone();
464 state.flush_events_task = Some(self.executor.spawn(async move {
465 executor.timer(FLUSH_INTERVAL).await;
466 this.flush_events();
467 }));
468 }
469
470 let signed_in = state.metrics_id.is_some();
471 state.events_queue.push(EventWrapper { signed_in, event });
472
473 if state.installation_id.is_some() {
474 if state.events_queue.len() >= state.max_queue_size {
475 drop(state);
476 self.flush_events();
477 }
478 }
479 }
480
481 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
482 self.state.lock().metrics_id.clone()
483 }
484
485 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
486 self.state.lock().installation_id.clone()
487 }
488
489 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
490 self.state.lock().is_staff
491 }
492
493 pub fn flush_events(self: &Arc<Self>) {
494 let mut state = self.state.lock();
495 state.first_event_date_time = None;
496 let mut events = mem::take(&mut state.events_queue);
497 state.flush_events_task.take();
498 drop(state);
499 if events.is_empty() {
500 return;
501 }
502
503 let this = self.clone();
504 self.executor
505 .spawn(
506 async move {
507 let mut json_bytes = Vec::new();
508
509 if let Some(file) = &mut this.state.lock().log_file {
510 let file = file.as_file_mut();
511 for event in &mut events {
512 json_bytes.clear();
513 serde_json::to_writer(&mut json_bytes, event)?;
514 file.write_all(&json_bytes)?;
515 file.write(b"\n")?;
516 }
517 }
518
519 {
520 let state = this.state.lock();
521 let request_body = EventRequestBody {
522 installation_id: state.installation_id.clone(),
523 session_id: state.session_id.clone(),
524 is_staff: state.is_staff.clone(),
525 app_version: state
526 .app_metadata
527 .app_version
528 .map(|version| version.to_string()),
529 os_name: state.app_metadata.os_name,
530 os_version: state
531 .app_metadata
532 .os_version
533 .map(|version| version.to_string()),
534 architecture: state.architecture,
535
536 release_channel: state.release_channel,
537 events,
538 };
539 json_bytes.clear();
540 serde_json::to_writer(&mut json_bytes, &request_body)?;
541 }
542
543 this.http_client
544 .post_json(&this.http_client.zed_url("/api/events"), json_bytes.into())
545 .await?;
546 anyhow::Ok(())
547 }
548 .log_err(),
549 )
550 .detach();
551 }
552}
553
554#[cfg(test)]
555mod tests {
556 use super::*;
557 use chrono::TimeZone;
558 use gpui::TestAppContext;
559 use util::http::FakeHttpClient;
560
561 #[gpui::test]
562 fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
563 init_test(cx);
564 let http = FakeHttpClient::with_200_response();
565 let installation_id = Some("installation_id".to_string());
566 let session_id = "session_id".to_string();
567
568 cx.update(|cx| {
569 let telemetry = Telemetry::new(http, cx);
570
571 telemetry.state.lock().max_queue_size = 4;
572 telemetry.start(installation_id, session_id, cx);
573
574 assert!(is_empty_state(&telemetry));
575
576 let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
577 let operation = "test".to_string();
578
579 let event =
580 telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
581 assert_eq!(
582 event,
583 Event::App {
584 operation: operation.clone(),
585 milliseconds_since_first_event: 0
586 }
587 );
588 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
589 assert!(telemetry.state.lock().flush_events_task.is_some());
590 assert_eq!(
591 telemetry.state.lock().first_event_date_time,
592 Some(first_date_time)
593 );
594
595 let mut date_time = first_date_time + chrono::Duration::milliseconds(100);
596
597 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
598 assert_eq!(
599 event,
600 Event::App {
601 operation: operation.clone(),
602 milliseconds_since_first_event: 100
603 }
604 );
605 assert_eq!(telemetry.state.lock().events_queue.len(), 2);
606 assert!(telemetry.state.lock().flush_events_task.is_some());
607 assert_eq!(
608 telemetry.state.lock().first_event_date_time,
609 Some(first_date_time)
610 );
611
612 date_time += chrono::Duration::milliseconds(100);
613
614 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
615 assert_eq!(
616 event,
617 Event::App {
618 operation: operation.clone(),
619 milliseconds_since_first_event: 200
620 }
621 );
622 assert_eq!(telemetry.state.lock().events_queue.len(), 3);
623 assert!(telemetry.state.lock().flush_events_task.is_some());
624 assert_eq!(
625 telemetry.state.lock().first_event_date_time,
626 Some(first_date_time)
627 );
628
629 date_time += chrono::Duration::milliseconds(100);
630
631 // Adding a 4th event should cause a flush
632 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
633 assert_eq!(
634 event,
635 Event::App {
636 operation: operation.clone(),
637 milliseconds_since_first_event: 300
638 }
639 );
640
641 assert!(is_empty_state(&telemetry));
642 });
643 }
644
645 #[gpui::test]
646 async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
647 init_test(cx);
648 let http = FakeHttpClient::with_200_response();
649 let installation_id = Some("installation_id".to_string());
650 let session_id = "session_id".to_string();
651
652 cx.update(|cx| {
653 let telemetry = Telemetry::new(http, cx);
654 telemetry.state.lock().max_queue_size = 4;
655 telemetry.start(installation_id, session_id, cx);
656
657 assert!(is_empty_state(&telemetry));
658
659 let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
660 let operation = "test".to_string();
661
662 let event =
663 telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
664 assert_eq!(
665 event,
666 Event::App {
667 operation: operation.clone(),
668 milliseconds_since_first_event: 0
669 }
670 );
671 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
672 assert!(telemetry.state.lock().flush_events_task.is_some());
673 assert_eq!(
674 telemetry.state.lock().first_event_date_time,
675 Some(first_date_time)
676 );
677
678 let duration = Duration::from_millis(1);
679
680 // Test 1 millisecond before the flush interval limit is met
681 executor.advance_clock(FLUSH_INTERVAL - duration);
682
683 assert!(!is_empty_state(&telemetry));
684
685 // Test the exact moment the flush interval limit is met
686 executor.advance_clock(duration);
687
688 assert!(is_empty_state(&telemetry));
689 });
690 }
691
692 // TODO:
693 // Test settings
694 // Update FakeHTTPClient to keep track of the number of requests and assert on it
695
696 fn init_test(cx: &mut TestAppContext) {
697 cx.update(|cx| {
698 let settings_store = SettingsStore::test(cx);
699 cx.set_global(settings_store);
700 });
701 }
702
703 fn is_empty_state(telemetry: &Telemetry) -> bool {
704 telemetry.state.lock().events_queue.is_empty()
705 && telemetry.state.lock().flush_events_task.is_none()
706 && telemetry.state.lock().first_event_date_time.is_none()
707 }
708}