1mod event_coalescer;
2
3use crate::{TelemetrySettings, ZED_SERVER_URL};
4use chrono::{DateTime, Utc};
5use futures::Future;
6use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
7use lazy_static::lazy_static;
8use parking_lot::Mutex;
9use serde::Serialize;
10use settings::{Settings, SettingsStore};
11use std::{env, io::Write, mem, path::PathBuf, sync::Arc, time::Duration};
12use sysinfo::{
13 CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
14};
15use tempfile::NamedTempFile;
16use util::http::HttpClient;
17#[cfg(not(debug_assertions))]
18use util::ResultExt;
19use util::{channel::ReleaseChannel, TryFutureExt};
20
21use self::event_coalescer::EventCoalescer;
22
23pub struct Telemetry {
24 http_client: Arc<dyn HttpClient>,
25 executor: BackgroundExecutor,
26 state: Arc<Mutex<TelemetryState>>,
27}
28
29struct TelemetryState {
30 settings: TelemetrySettings,
31 metrics_id: Option<Arc<str>>, // Per logged-in user
32 installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
33 session_id: Option<Arc<str>>, // Per app launch
34 release_channel: Option<&'static str>,
35 app_metadata: AppMetadata,
36 architecture: &'static str,
37 events_queue: Vec<EventWrapper>,
38 flush_events_task: Option<Task<()>>,
39 log_file: Option<NamedTempFile>,
40 is_staff: Option<bool>,
41 first_event_date_time: Option<DateTime<Utc>>,
42 event_coalescer: EventCoalescer,
43 max_queue_size: usize,
44}
45
46const EVENTS_URL_PATH: &'static str = "/api/events";
47
48lazy_static! {
49 static ref EVENTS_URL: String = format!("{}{}", *ZED_SERVER_URL, EVENTS_URL_PATH);
50}
51
52#[derive(Serialize, Debug)]
53struct EventRequestBody {
54 installation_id: Option<Arc<str>>,
55 session_id: Option<Arc<str>>,
56 is_staff: Option<bool>,
57 app_version: Option<String>,
58 os_name: &'static str,
59 os_version: Option<String>,
60 architecture: &'static str,
61 release_channel: Option<&'static str>,
62 events: Vec<EventWrapper>,
63}
64
65#[derive(Serialize, Debug)]
66struct EventWrapper {
67 signed_in: bool,
68 #[serde(flatten)]
69 event: Event,
70}
71
72#[derive(Clone, Debug, PartialEq, Serialize)]
73#[serde(rename_all = "snake_case")]
74pub enum AssistantKind {
75 Panel,
76 Inline,
77}
78
79#[derive(Clone, Debug, PartialEq, Serialize)]
80#[serde(tag = "type")]
81pub enum Event {
82 Editor {
83 operation: &'static str,
84 file_extension: Option<String>,
85 vim_mode: bool,
86 copilot_enabled: bool,
87 copilot_enabled_for_language: bool,
88 milliseconds_since_first_event: i64,
89 },
90 Copilot {
91 suggestion_id: Option<String>,
92 suggestion_accepted: bool,
93 file_extension: Option<String>,
94 milliseconds_since_first_event: i64,
95 },
96 Call {
97 operation: &'static str,
98 room_id: Option<u64>,
99 channel_id: Option<u64>,
100 milliseconds_since_first_event: i64,
101 },
102 Assistant {
103 conversation_id: Option<String>,
104 kind: AssistantKind,
105 model: &'static str,
106 milliseconds_since_first_event: i64,
107 },
108 Cpu {
109 usage_as_percentage: f32,
110 core_count: u32,
111 milliseconds_since_first_event: i64,
112 },
113 Memory {
114 memory_in_bytes: u64,
115 virtual_memory_in_bytes: u64,
116 milliseconds_since_first_event: i64,
117 },
118 App {
119 operation: String,
120 milliseconds_since_first_event: i64,
121 },
122 Setting {
123 setting: &'static str,
124 value: String,
125 milliseconds_since_first_event: i64,
126 },
127 Edit {
128 duration: i64,
129 environment: &'static str,
130 milliseconds_since_first_event: i64,
131 },
132 Action {
133 source: &'static str,
134 action: String,
135 milliseconds_since_first_event: i64,
136 },
137}
138
139#[cfg(debug_assertions)]
140const MAX_QUEUE_LEN: usize = 5;
141
142#[cfg(not(debug_assertions))]
143const MAX_QUEUE_LEN: usize = 50;
144
145#[cfg(debug_assertions)]
146const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
147
148#[cfg(not(debug_assertions))]
149const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
150
151impl Telemetry {
152 pub fn new(client: Arc<dyn HttpClient>, cx: &mut AppContext) -> Arc<Self> {
153 let release_channel = cx
154 .try_global::<ReleaseChannel>()
155 .map(|release_channel| release_channel.display_name());
156
157 TelemetrySettings::register(cx);
158
159 let state = Arc::new(Mutex::new(TelemetryState {
160 settings: TelemetrySettings::get_global(cx).clone(),
161 app_metadata: cx.app_metadata(),
162 architecture: env::consts::ARCH,
163 release_channel,
164 installation_id: None,
165 metrics_id: None,
166 session_id: None,
167 events_queue: Vec::new(),
168 flush_events_task: None,
169 log_file: None,
170 is_staff: None,
171 first_event_date_time: None,
172 event_coalescer: EventCoalescer::new(),
173 max_queue_size: MAX_QUEUE_LEN,
174 }));
175
176 #[cfg(not(debug_assertions))]
177 cx.background_executor()
178 .spawn({
179 let state = state.clone();
180 async move {
181 if let Some(tempfile) =
182 NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
183 {
184 state.lock().log_file = Some(tempfile);
185 }
186 }
187 })
188 .detach();
189
190 cx.observe_global::<SettingsStore>({
191 let state = state.clone();
192
193 move |cx| {
194 let mut state = state.lock();
195 state.settings = TelemetrySettings::get_global(cx).clone();
196 }
197 })
198 .detach();
199
200 // TODO: Replace all hardware stuff with nested SystemSpecs json
201 let this = Arc::new(Self {
202 http_client: client,
203 executor: cx.background_executor().clone(),
204 state,
205 });
206
207 // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
208 // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
209 std::mem::forget(cx.on_app_quit({
210 let this = this.clone();
211 move |_| this.shutdown_telemetry()
212 }));
213
214 this
215 }
216
217 #[cfg(any(test, feature = "test-support"))]
218 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
219 Task::ready(())
220 }
221
222 // Skip calling this function in tests.
223 // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
224 #[cfg(not(any(test, feature = "test-support")))]
225 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
226 self.report_app_event("close".to_string());
227 // TODO: close final edit period and make sure it's sent
228 Task::ready(())
229 }
230
231 pub fn log_file_path(&self) -> Option<PathBuf> {
232 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
233 }
234
235 pub fn start(
236 self: &Arc<Self>,
237 installation_id: Option<String>,
238 session_id: String,
239 cx: &mut AppContext,
240 ) {
241 let mut state = self.state.lock();
242 state.installation_id = installation_id.map(|id| id.into());
243 state.session_id = Some(session_id.into());
244 drop(state);
245
246 let this = self.clone();
247 cx.spawn(|_| async move {
248 // Avoiding calling `System::new_all()`, as there have been crashes related to it
249 let refresh_kind = RefreshKind::new()
250 .with_memory() // For memory usage
251 .with_processes(ProcessRefreshKind::everything()) // For process usage
252 .with_cpu(CpuRefreshKind::everything()); // For core count
253
254 let mut system = System::new_with_specifics(refresh_kind);
255
256 // Avoiding calling `refresh_all()`, just update what we need
257 system.refresh_specifics(refresh_kind);
258
259 // Waiting some amount of time before the first query is important to get a reasonable value
260 // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
261 const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
262
263 loop {
264 smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
265
266 system.refresh_specifics(refresh_kind);
267
268 let current_process = Pid::from_u32(std::process::id());
269 let Some(process) = system.processes().get(¤t_process) else {
270 let process = current_process;
271 log::error!("Failed to find own process {process:?} in system process table");
272 // TODO: Fire an error telemetry event
273 return;
274 };
275
276 this.report_memory_event(process.memory(), process.virtual_memory());
277 this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
278 }
279 })
280 .detach();
281 }
282
283 pub fn set_authenticated_user_info(
284 self: &Arc<Self>,
285 metrics_id: Option<String>,
286 is_staff: bool,
287 ) {
288 let mut state = self.state.lock();
289
290 if !state.settings.metrics {
291 return;
292 }
293
294 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
295 state.metrics_id = metrics_id.clone();
296 state.is_staff = Some(is_staff);
297 drop(state);
298 }
299
300 pub fn report_editor_event(
301 self: &Arc<Self>,
302 file_extension: Option<String>,
303 vim_mode: bool,
304 operation: &'static str,
305 copilot_enabled: bool,
306 copilot_enabled_for_language: bool,
307 ) {
308 let event = Event::Editor {
309 file_extension,
310 vim_mode,
311 operation,
312 copilot_enabled,
313 copilot_enabled_for_language,
314 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
315 };
316
317 self.report_event(event)
318 }
319
320 pub fn report_copilot_event(
321 self: &Arc<Self>,
322 suggestion_id: Option<String>,
323 suggestion_accepted: bool,
324 file_extension: Option<String>,
325 ) {
326 let event = Event::Copilot {
327 suggestion_id,
328 suggestion_accepted,
329 file_extension,
330 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
331 };
332
333 self.report_event(event)
334 }
335
336 pub fn report_assistant_event(
337 self: &Arc<Self>,
338 conversation_id: Option<String>,
339 kind: AssistantKind,
340 model: &'static str,
341 ) {
342 let event = Event::Assistant {
343 conversation_id,
344 kind,
345 model,
346 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
347 };
348
349 self.report_event(event)
350 }
351
352 pub fn report_call_event(
353 self: &Arc<Self>,
354 operation: &'static str,
355 room_id: Option<u64>,
356 channel_id: Option<u64>,
357 ) {
358 let event = Event::Call {
359 operation,
360 room_id,
361 channel_id,
362 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
363 };
364
365 self.report_event(event)
366 }
367
368 pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
369 let event = Event::Cpu {
370 usage_as_percentage,
371 core_count,
372 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
373 };
374
375 self.report_event(event)
376 }
377
378 pub fn report_memory_event(
379 self: &Arc<Self>,
380 memory_in_bytes: u64,
381 virtual_memory_in_bytes: u64,
382 ) {
383 let event = Event::Memory {
384 memory_in_bytes,
385 virtual_memory_in_bytes,
386 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
387 };
388
389 self.report_event(event)
390 }
391
392 pub fn report_app_event(self: &Arc<Self>, operation: String) {
393 self.report_app_event_with_date_time(operation, Utc::now());
394 }
395
396 fn report_app_event_with_date_time(
397 self: &Arc<Self>,
398 operation: String,
399 date_time: DateTime<Utc>,
400 ) -> Event {
401 let event = Event::App {
402 operation,
403 milliseconds_since_first_event: self.milliseconds_since_first_event(date_time),
404 };
405
406 self.report_event(event.clone());
407
408 event
409 }
410
411 pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
412 let event = Event::Setting {
413 setting,
414 value,
415 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
416 };
417
418 self.report_event(event)
419 }
420
421 pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
422 let mut state = self.state.lock();
423 let period_data = state.event_coalescer.log_event(environment);
424 drop(state);
425
426 if let Some((start, end, environment)) = period_data {
427 let event = Event::Edit {
428 duration: end.timestamp_millis() - start.timestamp_millis(),
429 environment,
430 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
431 };
432
433 self.report_event(event);
434 }
435 }
436
437 pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
438 let event = Event::Action {
439 source,
440 action,
441 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
442 };
443
444 self.report_event(event)
445 }
446
447 fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
448 let mut state = self.state.lock();
449
450 match state.first_event_date_time {
451 Some(first_event_date_time) => {
452 date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
453 }
454 None => {
455 state.first_event_date_time = Some(date_time);
456 0
457 }
458 }
459 }
460
461 fn report_event(self: &Arc<Self>, event: Event) {
462 let mut state = self.state.lock();
463
464 if !state.settings.metrics {
465 return;
466 }
467
468 if state.flush_events_task.is_none() {
469 let this = self.clone();
470 let executor = self.executor.clone();
471 state.flush_events_task = Some(self.executor.spawn(async move {
472 executor.timer(FLUSH_INTERVAL).await;
473 this.flush_events();
474 }));
475 }
476
477 let signed_in = state.metrics_id.is_some();
478 state.events_queue.push(EventWrapper { signed_in, event });
479
480 if state.installation_id.is_some() {
481 if state.events_queue.len() >= state.max_queue_size {
482 drop(state);
483 self.flush_events();
484 }
485 }
486 }
487
488 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
489 self.state.lock().metrics_id.clone()
490 }
491
492 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
493 self.state.lock().installation_id.clone()
494 }
495
496 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
497 self.state.lock().is_staff
498 }
499
500 pub fn flush_events(self: &Arc<Self>) {
501 let mut state = self.state.lock();
502 state.first_event_date_time = None;
503 let mut events = mem::take(&mut state.events_queue);
504 state.flush_events_task.take();
505 drop(state);
506 if events.is_empty() {
507 return;
508 }
509
510 let this = self.clone();
511 self.executor
512 .spawn(
513 async move {
514 let mut json_bytes = Vec::new();
515
516 if let Some(file) = &mut this.state.lock().log_file {
517 let file = file.as_file_mut();
518 for event in &mut events {
519 json_bytes.clear();
520 serde_json::to_writer(&mut json_bytes, event)?;
521 file.write_all(&json_bytes)?;
522 file.write(b"\n")?;
523 }
524 }
525
526 {
527 let state = this.state.lock();
528 let request_body = EventRequestBody {
529 installation_id: state.installation_id.clone(),
530 session_id: state.session_id.clone(),
531 is_staff: state.is_staff.clone(),
532 app_version: state
533 .app_metadata
534 .app_version
535 .map(|version| version.to_string()),
536 os_name: state.app_metadata.os_name,
537 os_version: state
538 .app_metadata
539 .os_version
540 .map(|version| version.to_string()),
541 architecture: state.architecture,
542
543 release_channel: state.release_channel,
544 events,
545 };
546 json_bytes.clear();
547 serde_json::to_writer(&mut json_bytes, &request_body)?;
548 }
549
550 this.http_client
551 .post_json(EVENTS_URL.as_str(), json_bytes.into())
552 .await?;
553 anyhow::Ok(())
554 }
555 .log_err(),
556 )
557 .detach();
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564 use chrono::TimeZone;
565 use gpui::TestAppContext;
566 use util::http::FakeHttpClient;
567
568 #[gpui::test]
569 fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
570 init_test(cx);
571 let http = FakeHttpClient::with_200_response();
572 let installation_id = Some("installation_id".to_string());
573 let session_id = "session_id".to_string();
574
575 cx.update(|cx| {
576 let telemetry = Telemetry::new(http, cx);
577
578 telemetry.state.lock().max_queue_size = 4;
579 telemetry.start(installation_id, session_id, cx);
580
581 assert!(is_empty_state(&telemetry));
582
583 let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
584 let operation = "test".to_string();
585
586 let event =
587 telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
588 assert_eq!(
589 event,
590 Event::App {
591 operation: operation.clone(),
592 milliseconds_since_first_event: 0
593 }
594 );
595 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
596 assert!(telemetry.state.lock().flush_events_task.is_some());
597 assert_eq!(
598 telemetry.state.lock().first_event_date_time,
599 Some(first_date_time)
600 );
601
602 let mut date_time = first_date_time + chrono::Duration::milliseconds(100);
603
604 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
605 assert_eq!(
606 event,
607 Event::App {
608 operation: operation.clone(),
609 milliseconds_since_first_event: 100
610 }
611 );
612 assert_eq!(telemetry.state.lock().events_queue.len(), 2);
613 assert!(telemetry.state.lock().flush_events_task.is_some());
614 assert_eq!(
615 telemetry.state.lock().first_event_date_time,
616 Some(first_date_time)
617 );
618
619 date_time += chrono::Duration::milliseconds(100);
620
621 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
622 assert_eq!(
623 event,
624 Event::App {
625 operation: operation.clone(),
626 milliseconds_since_first_event: 200
627 }
628 );
629 assert_eq!(telemetry.state.lock().events_queue.len(), 3);
630 assert!(telemetry.state.lock().flush_events_task.is_some());
631 assert_eq!(
632 telemetry.state.lock().first_event_date_time,
633 Some(first_date_time)
634 );
635
636 date_time += chrono::Duration::milliseconds(100);
637
638 // Adding a 4th event should cause a flush
639 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
640 assert_eq!(
641 event,
642 Event::App {
643 operation: operation.clone(),
644 milliseconds_since_first_event: 300
645 }
646 );
647
648 assert!(is_empty_state(&telemetry));
649 });
650 }
651
652 #[gpui::test]
653 async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
654 init_test(cx);
655 let http = FakeHttpClient::with_200_response();
656 let installation_id = Some("installation_id".to_string());
657 let session_id = "session_id".to_string();
658
659 cx.update(|cx| {
660 let telemetry = Telemetry::new(http, cx);
661 telemetry.state.lock().max_queue_size = 4;
662 telemetry.start(installation_id, session_id, cx);
663
664 assert!(is_empty_state(&telemetry));
665
666 let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
667 let operation = "test".to_string();
668
669 let event =
670 telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
671 assert_eq!(
672 event,
673 Event::App {
674 operation: operation.clone(),
675 milliseconds_since_first_event: 0
676 }
677 );
678 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
679 assert!(telemetry.state.lock().flush_events_task.is_some());
680 assert_eq!(
681 telemetry.state.lock().first_event_date_time,
682 Some(first_date_time)
683 );
684
685 let duration = Duration::from_millis(1);
686
687 // Test 1 millisecond before the flush interval limit is met
688 executor.advance_clock(FLUSH_INTERVAL - duration);
689
690 assert!(!is_empty_state(&telemetry));
691
692 // Test the exact moment the flush interval limit is met
693 executor.advance_clock(duration);
694
695 assert!(is_empty_state(&telemetry));
696 });
697 }
698
699 // TODO:
700 // Test settings
701 // Update FakeHTTPClient to keep track of the number of requests and assert on it
702
703 fn init_test(cx: &mut TestAppContext) {
704 cx.update(|cx| {
705 let settings_store = SettingsStore::test(cx);
706 cx.set_global(settings_store);
707 });
708 }
709
710 fn is_empty_state(telemetry: &Telemetry) -> bool {
711 telemetry.state.lock().events_queue.is_empty()
712 && telemetry.state.lock().flush_events_task.is_none()
713 && telemetry.state.lock().first_event_date_time.is_none()
714 }
715}