1mod event_coalescer;
2
3use crate::{TelemetrySettings, ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
4use chrono::{DateTime, Utc};
5use futures::Future;
6use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
7use lazy_static::lazy_static;
8use parking_lot::Mutex;
9use serde::Serialize;
10use settings::{Settings, SettingsStore};
11use std::{env, io::Write, mem, path::PathBuf, sync::Arc, time::Duration};
12use sysinfo::{
13 CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
14};
15use tempfile::NamedTempFile;
16use util::http::HttpClient;
17#[cfg(not(debug_assertions))]
18use util::ResultExt;
19use util::{channel::ReleaseChannel, TryFutureExt};
20
21use self::event_coalescer::EventCoalescer;
22
23pub struct Telemetry {
24 http_client: Arc<dyn HttpClient>,
25 executor: BackgroundExecutor,
26 state: Arc<Mutex<TelemetryState>>,
27}
28
29struct TelemetryState {
30 settings: TelemetrySettings,
31 metrics_id: Option<Arc<str>>, // Per logged-in user
32 installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
33 session_id: Option<Arc<str>>, // Per app launch
34 release_channel: Option<&'static str>,
35 app_metadata: AppMetadata,
36 architecture: &'static str,
37 events_queue: Vec<EventWrapper>,
38 flush_events_task: Option<Task<()>>,
39 log_file: Option<NamedTempFile>,
40 is_staff: Option<bool>,
41 first_event_date_time: Option<DateTime<Utc>>,
42 event_coalescer: EventCoalescer,
43 max_queue_size: usize,
44}
45
46const EVENTS_URL_PATH: &'static str = "/api/events";
47
48lazy_static! {
49 static ref EVENTS_URL: String = format!("{}{}", *ZED_SERVER_URL, EVENTS_URL_PATH);
50}
51
52#[derive(Serialize, Debug)]
53struct EventRequestBody {
54 token: &'static str,
55 installation_id: Option<Arc<str>>,
56 session_id: Option<Arc<str>>,
57 is_staff: Option<bool>,
58 app_version: Option<String>,
59 os_name: &'static str,
60 os_version: Option<String>,
61 architecture: &'static str,
62 release_channel: Option<&'static str>,
63 events: Vec<EventWrapper>,
64}
65
66#[derive(Serialize, Debug)]
67struct EventWrapper {
68 signed_in: bool,
69 #[serde(flatten)]
70 event: Event,
71}
72
73#[derive(Clone, Debug, PartialEq, Serialize)]
74#[serde(rename_all = "snake_case")]
75pub enum AssistantKind {
76 Panel,
77 Inline,
78}
79
80#[derive(Clone, Debug, PartialEq, Serialize)]
81#[serde(tag = "type")]
82pub enum Event {
83 Editor {
84 operation: &'static str,
85 file_extension: Option<String>,
86 vim_mode: bool,
87 copilot_enabled: bool,
88 copilot_enabled_for_language: bool,
89 milliseconds_since_first_event: i64,
90 },
91 Copilot {
92 suggestion_id: Option<String>,
93 suggestion_accepted: bool,
94 file_extension: Option<String>,
95 milliseconds_since_first_event: i64,
96 },
97 Call {
98 operation: &'static str,
99 room_id: Option<u64>,
100 channel_id: Option<u64>,
101 milliseconds_since_first_event: i64,
102 },
103 Assistant {
104 conversation_id: Option<String>,
105 kind: AssistantKind,
106 model: &'static str,
107 milliseconds_since_first_event: i64,
108 },
109 Cpu {
110 usage_as_percentage: f32,
111 core_count: u32,
112 milliseconds_since_first_event: i64,
113 },
114 Memory {
115 memory_in_bytes: u64,
116 virtual_memory_in_bytes: u64,
117 milliseconds_since_first_event: i64,
118 },
119 App {
120 operation: String,
121 milliseconds_since_first_event: i64,
122 },
123 Setting {
124 setting: &'static str,
125 value: String,
126 milliseconds_since_first_event: i64,
127 },
128 Edit {
129 duration: i64,
130 environment: &'static str,
131 milliseconds_since_first_event: i64,
132 },
133 Action {
134 source: &'static str,
135 action: String,
136 milliseconds_since_first_event: i64,
137 },
138}
139
140#[cfg(debug_assertions)]
141const MAX_QUEUE_LEN: usize = 5;
142
143#[cfg(not(debug_assertions))]
144const MAX_QUEUE_LEN: usize = 50;
145
146#[cfg(debug_assertions)]
147const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
148
149#[cfg(not(debug_assertions))]
150const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
151
152impl Telemetry {
153 pub fn new(client: Arc<dyn HttpClient>, cx: &mut AppContext) -> Arc<Self> {
154 let release_channel = cx
155 .try_global::<ReleaseChannel>()
156 .map(|release_channel| release_channel.display_name());
157
158 TelemetrySettings::register(cx);
159
160 let state = Arc::new(Mutex::new(TelemetryState {
161 settings: TelemetrySettings::get_global(cx).clone(),
162 app_metadata: cx.app_metadata(),
163 architecture: env::consts::ARCH,
164 release_channel,
165 installation_id: None,
166 metrics_id: None,
167 session_id: None,
168 events_queue: Vec::new(),
169 flush_events_task: None,
170 log_file: None,
171 is_staff: None,
172 first_event_date_time: None,
173 event_coalescer: EventCoalescer::new(),
174 max_queue_size: MAX_QUEUE_LEN,
175 }));
176
177 #[cfg(not(debug_assertions))]
178 cx.background_executor()
179 .spawn({
180 let state = state.clone();
181 async move {
182 if let Some(tempfile) =
183 NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
184 {
185 state.lock().log_file = Some(tempfile);
186 }
187 }
188 })
189 .detach();
190
191 cx.observe_global::<SettingsStore>({
192 let state = state.clone();
193
194 move |cx| {
195 let mut state = state.lock();
196 state.settings = TelemetrySettings::get_global(cx).clone();
197 }
198 })
199 .detach();
200
201 // TODO: Replace all hardware stuff with nested SystemSpecs json
202 let this = Arc::new(Self {
203 http_client: client,
204 executor: cx.background_executor().clone(),
205 state,
206 });
207
208 // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
209 // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
210 std::mem::forget(cx.on_app_quit({
211 let this = this.clone();
212 move |_| this.shutdown_telemetry()
213 }));
214
215 this
216 }
217
218 #[cfg(any(test, feature = "test-support"))]
219 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
220 Task::ready(())
221 }
222
223 // Skip calling this function in tests.
224 // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
225 #[cfg(not(any(test, feature = "test-support")))]
226 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
227 self.report_app_event("close".to_string());
228 // TODO: close final edit period and make sure it's sent
229 Task::ready(())
230 }
231
232 pub fn log_file_path(&self) -> Option<PathBuf> {
233 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
234 }
235
236 pub fn start(
237 self: &Arc<Self>,
238 installation_id: Option<String>,
239 session_id: String,
240 cx: &mut AppContext,
241 ) {
242 let mut state = self.state.lock();
243 state.installation_id = installation_id.map(|id| id.into());
244 state.session_id = Some(session_id.into());
245 drop(state);
246
247 let this = self.clone();
248 cx.spawn(|_| async move {
249 // Avoiding calling `System::new_all()`, as there have been crashes related to it
250 let refresh_kind = RefreshKind::new()
251 .with_memory() // For memory usage
252 .with_processes(ProcessRefreshKind::everything()) // For process usage
253 .with_cpu(CpuRefreshKind::everything()); // For core count
254
255 let mut system = System::new_with_specifics(refresh_kind);
256
257 // Avoiding calling `refresh_all()`, just update what we need
258 system.refresh_specifics(refresh_kind);
259
260 // Waiting some amount of time before the first query is important to get a reasonable value
261 // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
262 const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
263
264 loop {
265 smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
266
267 system.refresh_specifics(refresh_kind);
268
269 let current_process = Pid::from_u32(std::process::id());
270 let Some(process) = system.processes().get(¤t_process) else {
271 let process = current_process;
272 log::error!("Failed to find own process {process:?} in system process table");
273 // TODO: Fire an error telemetry event
274 return;
275 };
276
277 this.report_memory_event(process.memory(), process.virtual_memory());
278 this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
279 }
280 })
281 .detach();
282 }
283
284 pub fn set_authenticated_user_info(
285 self: &Arc<Self>,
286 metrics_id: Option<String>,
287 is_staff: bool,
288 ) {
289 let mut state = self.state.lock();
290
291 if !state.settings.metrics {
292 return;
293 }
294
295 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
296 state.metrics_id = metrics_id.clone();
297 state.is_staff = Some(is_staff);
298 drop(state);
299 }
300
301 pub fn report_editor_event(
302 self: &Arc<Self>,
303 file_extension: Option<String>,
304 vim_mode: bool,
305 operation: &'static str,
306 copilot_enabled: bool,
307 copilot_enabled_for_language: bool,
308 ) {
309 let event = Event::Editor {
310 file_extension,
311 vim_mode,
312 operation,
313 copilot_enabled,
314 copilot_enabled_for_language,
315 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
316 };
317
318 self.report_event(event)
319 }
320
321 pub fn report_copilot_event(
322 self: &Arc<Self>,
323 suggestion_id: Option<String>,
324 suggestion_accepted: bool,
325 file_extension: Option<String>,
326 ) {
327 let event = Event::Copilot {
328 suggestion_id,
329 suggestion_accepted,
330 file_extension,
331 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
332 };
333
334 self.report_event(event)
335 }
336
337 pub fn report_assistant_event(
338 self: &Arc<Self>,
339 conversation_id: Option<String>,
340 kind: AssistantKind,
341 model: &'static str,
342 ) {
343 let event = Event::Assistant {
344 conversation_id,
345 kind,
346 model,
347 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
348 };
349
350 self.report_event(event)
351 }
352
353 pub fn report_call_event(
354 self: &Arc<Self>,
355 operation: &'static str,
356 room_id: Option<u64>,
357 channel_id: Option<u64>,
358 ) {
359 let event = Event::Call {
360 operation,
361 room_id,
362 channel_id,
363 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
364 };
365
366 self.report_event(event)
367 }
368
369 pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
370 let event = Event::Cpu {
371 usage_as_percentage,
372 core_count,
373 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
374 };
375
376 self.report_event(event)
377 }
378
379 pub fn report_memory_event(
380 self: &Arc<Self>,
381 memory_in_bytes: u64,
382 virtual_memory_in_bytes: u64,
383 ) {
384 let event = Event::Memory {
385 memory_in_bytes,
386 virtual_memory_in_bytes,
387 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
388 };
389
390 self.report_event(event)
391 }
392
393 pub fn report_app_event(self: &Arc<Self>, operation: String) {
394 self.report_app_event_with_date_time(operation, Utc::now());
395 }
396
397 fn report_app_event_with_date_time(
398 self: &Arc<Self>,
399 operation: String,
400 date_time: DateTime<Utc>,
401 ) -> Event {
402 let event = Event::App {
403 operation,
404 milliseconds_since_first_event: self.milliseconds_since_first_event(date_time),
405 };
406
407 self.report_event(event.clone());
408
409 event
410 }
411
412 pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
413 let event = Event::Setting {
414 setting,
415 value,
416 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
417 };
418
419 self.report_event(event)
420 }
421
422 pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
423 let mut state = self.state.lock();
424 let period_data = state.event_coalescer.log_event(environment);
425 drop(state);
426
427 if let Some((start, end, environment)) = period_data {
428 let event = Event::Edit {
429 duration: end.timestamp_millis() - start.timestamp_millis(),
430 environment,
431 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
432 };
433
434 self.report_event(event);
435 }
436 }
437
438 pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
439 let event = Event::Action {
440 source,
441 action,
442 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
443 };
444
445 self.report_event(event)
446 }
447
448 fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
449 let mut state = self.state.lock();
450
451 match state.first_event_date_time {
452 Some(first_event_date_time) => {
453 date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
454 }
455 None => {
456 state.first_event_date_time = Some(date_time);
457 0
458 }
459 }
460 }
461
462 fn report_event(self: &Arc<Self>, event: Event) {
463 let mut state = self.state.lock();
464
465 if !state.settings.metrics {
466 return;
467 }
468
469 if state.flush_events_task.is_none() {
470 let this = self.clone();
471 let executor = self.executor.clone();
472 state.flush_events_task = Some(self.executor.spawn(async move {
473 executor.timer(FLUSH_INTERVAL).await;
474 this.flush_events();
475 }));
476 }
477
478 let signed_in = state.metrics_id.is_some();
479 state.events_queue.push(EventWrapper { signed_in, event });
480
481 if state.installation_id.is_some() {
482 if state.events_queue.len() >= state.max_queue_size {
483 drop(state);
484 self.flush_events();
485 }
486 }
487 }
488
489 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
490 self.state.lock().metrics_id.clone()
491 }
492
493 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
494 self.state.lock().installation_id.clone()
495 }
496
497 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
498 self.state.lock().is_staff
499 }
500
501 pub fn flush_events(self: &Arc<Self>) {
502 let mut state = self.state.lock();
503 state.first_event_date_time = None;
504 let mut events = mem::take(&mut state.events_queue);
505 state.flush_events_task.take();
506 drop(state);
507 if events.is_empty() {
508 return;
509 }
510
511 let this = self.clone();
512 self.executor
513 .spawn(
514 async move {
515 let mut json_bytes = Vec::new();
516
517 if let Some(file) = &mut this.state.lock().log_file {
518 let file = file.as_file_mut();
519 for event in &mut events {
520 json_bytes.clear();
521 serde_json::to_writer(&mut json_bytes, event)?;
522 file.write_all(&json_bytes)?;
523 file.write(b"\n")?;
524 }
525 }
526
527 {
528 let state = this.state.lock();
529 let request_body = EventRequestBody {
530 token: ZED_SECRET_CLIENT_TOKEN,
531 installation_id: state.installation_id.clone(),
532 session_id: state.session_id.clone(),
533 is_staff: state.is_staff.clone(),
534 app_version: state
535 .app_metadata
536 .app_version
537 .map(|version| version.to_string()),
538 os_name: state.app_metadata.os_name,
539 os_version: state
540 .app_metadata
541 .os_version
542 .map(|version| version.to_string()),
543 architecture: state.architecture,
544
545 release_channel: state.release_channel,
546 events,
547 };
548 json_bytes.clear();
549 serde_json::to_writer(&mut json_bytes, &request_body)?;
550 }
551
552 this.http_client
553 .post_json(EVENTS_URL.as_str(), json_bytes.into())
554 .await?;
555 anyhow::Ok(())
556 }
557 .log_err(),
558 )
559 .detach();
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566 use chrono::TimeZone;
567 use gpui::TestAppContext;
568 use util::http::FakeHttpClient;
569
570 #[gpui::test]
571 fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
572 init_test(cx);
573 let http = FakeHttpClient::with_200_response();
574 let installation_id = Some("installation_id".to_string());
575 let session_id = "session_id".to_string();
576
577 cx.update(|cx| {
578 let telemetry = Telemetry::new(http, cx);
579
580 telemetry.state.lock().max_queue_size = 4;
581 telemetry.start(installation_id, session_id, cx);
582
583 assert!(is_empty_state(&telemetry));
584
585 let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
586 let operation = "test".to_string();
587
588 let event =
589 telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
590 assert_eq!(
591 event,
592 Event::App {
593 operation: operation.clone(),
594 milliseconds_since_first_event: 0
595 }
596 );
597 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
598 assert!(telemetry.state.lock().flush_events_task.is_some());
599 assert_eq!(
600 telemetry.state.lock().first_event_date_time,
601 Some(first_date_time)
602 );
603
604 let mut date_time = first_date_time + chrono::Duration::milliseconds(100);
605
606 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
607 assert_eq!(
608 event,
609 Event::App {
610 operation: operation.clone(),
611 milliseconds_since_first_event: 100
612 }
613 );
614 assert_eq!(telemetry.state.lock().events_queue.len(), 2);
615 assert!(telemetry.state.lock().flush_events_task.is_some());
616 assert_eq!(
617 telemetry.state.lock().first_event_date_time,
618 Some(first_date_time)
619 );
620
621 date_time += chrono::Duration::milliseconds(100);
622
623 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
624 assert_eq!(
625 event,
626 Event::App {
627 operation: operation.clone(),
628 milliseconds_since_first_event: 200
629 }
630 );
631 assert_eq!(telemetry.state.lock().events_queue.len(), 3);
632 assert!(telemetry.state.lock().flush_events_task.is_some());
633 assert_eq!(
634 telemetry.state.lock().first_event_date_time,
635 Some(first_date_time)
636 );
637
638 date_time += chrono::Duration::milliseconds(100);
639
640 // Adding a 4th event should cause a flush
641 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
642 assert_eq!(
643 event,
644 Event::App {
645 operation: operation.clone(),
646 milliseconds_since_first_event: 300
647 }
648 );
649
650 assert!(is_empty_state(&telemetry));
651 });
652 }
653
654 #[gpui::test]
655 async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
656 init_test(cx);
657 let http = FakeHttpClient::with_200_response();
658 let installation_id = Some("installation_id".to_string());
659 let session_id = "session_id".to_string();
660
661 cx.update(|cx| {
662 let telemetry = Telemetry::new(http, cx);
663 telemetry.state.lock().max_queue_size = 4;
664 telemetry.start(installation_id, session_id, cx);
665
666 assert!(is_empty_state(&telemetry));
667
668 let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
669 let operation = "test".to_string();
670
671 let event =
672 telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
673 assert_eq!(
674 event,
675 Event::App {
676 operation: operation.clone(),
677 milliseconds_since_first_event: 0
678 }
679 );
680 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
681 assert!(telemetry.state.lock().flush_events_task.is_some());
682 assert_eq!(
683 telemetry.state.lock().first_event_date_time,
684 Some(first_date_time)
685 );
686
687 let duration = Duration::from_millis(1);
688
689 // Test 1 millisecond before the flush interval limit is met
690 executor.advance_clock(FLUSH_INTERVAL - duration);
691
692 assert!(!is_empty_state(&telemetry));
693
694 // Test the exact moment the flush interval limit is met
695 executor.advance_clock(duration);
696
697 assert!(is_empty_state(&telemetry));
698 });
699 }
700
701 // TODO:
702 // Test settings
703 // Update FakeHTTPClient to keep track of the number of requests and assert on it
704
705 fn init_test(cx: &mut TestAppContext) {
706 cx.update(|cx| {
707 let settings_store = SettingsStore::test(cx);
708 cx.set_global(settings_store);
709 });
710 }
711
712 fn is_empty_state(telemetry: &Telemetry) -> bool {
713 telemetry.state.lock().events_queue.is_empty()
714 && telemetry.state.lock().flush_events_task.is_none()
715 && telemetry.state.lock().first_event_date_time.is_none()
716 }
717}