1mod event_coalescer;
2
3use crate::TelemetrySettings;
4use chrono::{DateTime, Utc};
5use futures::Future;
6use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
7use once_cell::sync::Lazy;
8use parking_lot::Mutex;
9use release_channel::ReleaseChannel;
10use serde::Serialize;
11use settings::{Settings, SettingsStore};
12use sha2::{Digest, Sha256};
13use std::io::Write;
14use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
15use sysinfo::{
16 CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
17};
18use tempfile::NamedTempFile;
19use util::http::{self, HttpClient, Method, ZedHttpClient};
20#[cfg(not(debug_assertions))]
21use util::ResultExt;
22use util::TryFutureExt;
23
24use self::event_coalescer::EventCoalescer;
25
26pub struct Telemetry {
27 http_client: Arc<ZedHttpClient>,
28 executor: BackgroundExecutor,
29 state: Arc<Mutex<TelemetryState>>,
30}
31
32struct TelemetryState {
33 settings: TelemetrySettings,
34 metrics_id: Option<Arc<str>>, // Per logged-in user
35 installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
36 session_id: Option<Arc<str>>, // Per app launch
37 release_channel: Option<&'static str>,
38 app_metadata: AppMetadata,
39 architecture: &'static str,
40 events_queue: Vec<EventWrapper>,
41 flush_events_task: Option<Task<()>>,
42 log_file: Option<NamedTempFile>,
43 is_staff: Option<bool>,
44 first_event_date_time: Option<DateTime<Utc>>,
45 event_coalescer: EventCoalescer,
46 max_queue_size: usize,
47}
48
49#[derive(Serialize, Debug)]
50struct EventRequestBody {
51 installation_id: Option<Arc<str>>,
52 session_id: Option<Arc<str>>,
53 is_staff: Option<bool>,
54 app_version: Option<String>,
55 os_name: &'static str,
56 os_version: Option<String>,
57 architecture: &'static str,
58 release_channel: Option<&'static str>,
59 events: Vec<EventWrapper>,
60}
61
62#[derive(Serialize, Debug)]
63struct EventWrapper {
64 signed_in: bool,
65 #[serde(flatten)]
66 event: Event,
67}
68
69#[derive(Clone, Debug, PartialEq, Serialize)]
70#[serde(rename_all = "snake_case")]
71pub enum AssistantKind {
72 Panel,
73 Inline,
74}
75
76#[derive(Clone, Debug, PartialEq, Serialize)]
77#[serde(tag = "type")]
78pub enum Event {
79 Editor {
80 operation: &'static str,
81 file_extension: Option<String>,
82 vim_mode: bool,
83 copilot_enabled: bool,
84 copilot_enabled_for_language: bool,
85 milliseconds_since_first_event: i64,
86 },
87 Copilot {
88 suggestion_id: Option<String>,
89 suggestion_accepted: bool,
90 file_extension: Option<String>,
91 milliseconds_since_first_event: i64,
92 },
93 Call {
94 operation: &'static str,
95 room_id: Option<u64>,
96 channel_id: Option<u64>,
97 milliseconds_since_first_event: i64,
98 },
99 Assistant {
100 conversation_id: Option<String>,
101 kind: AssistantKind,
102 model: &'static str,
103 milliseconds_since_first_event: i64,
104 },
105 Cpu {
106 usage_as_percentage: f32,
107 core_count: u32,
108 milliseconds_since_first_event: i64,
109 },
110 Memory {
111 memory_in_bytes: u64,
112 virtual_memory_in_bytes: u64,
113 milliseconds_since_first_event: i64,
114 },
115 App {
116 operation: String,
117 milliseconds_since_first_event: i64,
118 },
119 Setting {
120 setting: &'static str,
121 value: String,
122 milliseconds_since_first_event: i64,
123 },
124 Edit {
125 duration: i64,
126 environment: &'static str,
127 milliseconds_since_first_event: i64,
128 },
129 Action {
130 source: &'static str,
131 action: String,
132 milliseconds_since_first_event: i64,
133 },
134}
135
136#[cfg(debug_assertions)]
137const MAX_QUEUE_LEN: usize = 5;
138
139#[cfg(not(debug_assertions))]
140const MAX_QUEUE_LEN: usize = 50;
141
142#[cfg(debug_assertions)]
143const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
144
145#[cfg(not(debug_assertions))]
146const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
147
148static ZED_CLIENT_CHECKSUM_SEED: Lazy<Vec<u8>> = Lazy::new(|| {
149 option_env!("ZED_CLIENT_CHECKSUM_SEED")
150 .unwrap_or("development-checksum-seed")
151 .as_bytes()
152 .into()
153});
154
155impl Telemetry {
156 pub fn new(client: Arc<ZedHttpClient>, cx: &mut AppContext) -> Arc<Self> {
157 let release_channel =
158 ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
159
160 TelemetrySettings::register(cx);
161
162 let state = Arc::new(Mutex::new(TelemetryState {
163 settings: TelemetrySettings::get_global(cx).clone(),
164 app_metadata: cx.app_metadata(),
165 architecture: env::consts::ARCH,
166 release_channel,
167 installation_id: None,
168 metrics_id: None,
169 session_id: None,
170 events_queue: Vec::new(),
171 flush_events_task: None,
172 log_file: None,
173 is_staff: None,
174 first_event_date_time: None,
175 event_coalescer: EventCoalescer::new(),
176 max_queue_size: MAX_QUEUE_LEN,
177 }));
178
179 #[cfg(not(debug_assertions))]
180 cx.background_executor()
181 .spawn({
182 let state = state.clone();
183 async move {
184 if let Some(tempfile) =
185 NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
186 {
187 state.lock().log_file = Some(tempfile);
188 }
189 }
190 })
191 .detach();
192
193 cx.observe_global::<SettingsStore>({
194 let state = state.clone();
195
196 move |cx| {
197 let mut state = state.lock();
198 state.settings = TelemetrySettings::get_global(cx).clone();
199 }
200 })
201 .detach();
202
203 // TODO: Replace all hardware stuff with nested SystemSpecs json
204 let this = Arc::new(Self {
205 http_client: client,
206 executor: cx.background_executor().clone(),
207 state,
208 });
209
210 // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
211 // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
212 std::mem::forget(cx.on_app_quit({
213 let this = this.clone();
214 move |_| this.shutdown_telemetry()
215 }));
216
217 this
218 }
219
220 #[cfg(any(test, feature = "test-support"))]
221 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
222 Task::ready(())
223 }
224
225 // Skip calling this function in tests.
226 // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
227 #[cfg(not(any(test, feature = "test-support")))]
228 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
229 self.report_app_event("close".to_string());
230 // TODO: close final edit period and make sure it's sent
231 Task::ready(())
232 }
233
234 pub fn log_file_path(&self) -> Option<PathBuf> {
235 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
236 }
237
238 pub fn start(
239 self: &Arc<Self>,
240 installation_id: Option<String>,
241 session_id: String,
242 cx: &mut AppContext,
243 ) {
244 let mut state = self.state.lock();
245 state.installation_id = installation_id.map(|id| id.into());
246 state.session_id = Some(session_id.into());
247 drop(state);
248
249 let this = self.clone();
250 cx.spawn(|_| async move {
251 // Avoiding calling `System::new_all()`, as there have been crashes related to it
252 let refresh_kind = RefreshKind::new()
253 .with_memory() // For memory usage
254 .with_processes(ProcessRefreshKind::everything()) // For process usage
255 .with_cpu(CpuRefreshKind::everything()); // For core count
256
257 let mut system = System::new_with_specifics(refresh_kind);
258
259 // Avoiding calling `refresh_all()`, just update what we need
260 system.refresh_specifics(refresh_kind);
261
262 // Waiting some amount of time before the first query is important to get a reasonable value
263 // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
264 const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
265
266 loop {
267 smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
268
269 system.refresh_specifics(refresh_kind);
270
271 let current_process = Pid::from_u32(std::process::id());
272 let Some(process) = system.processes().get(¤t_process) else {
273 let process = current_process;
274 log::error!("Failed to find own process {process:?} in system process table");
275 // TODO: Fire an error telemetry event
276 return;
277 };
278
279 this.report_memory_event(process.memory(), process.virtual_memory());
280 this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
281 }
282 })
283 .detach();
284 }
285
286 pub fn set_authenticated_user_info(
287 self: &Arc<Self>,
288 metrics_id: Option<String>,
289 is_staff: bool,
290 ) {
291 let mut state = self.state.lock();
292
293 if !state.settings.metrics {
294 return;
295 }
296
297 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
298 state.metrics_id = metrics_id.clone();
299 state.is_staff = Some(is_staff);
300 drop(state);
301 }
302
303 pub fn report_editor_event(
304 self: &Arc<Self>,
305 file_extension: Option<String>,
306 vim_mode: bool,
307 operation: &'static str,
308 copilot_enabled: bool,
309 copilot_enabled_for_language: bool,
310 ) {
311 let event = Event::Editor {
312 file_extension,
313 vim_mode,
314 operation,
315 copilot_enabled,
316 copilot_enabled_for_language,
317 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
318 };
319
320 self.report_event(event)
321 }
322
323 pub fn report_copilot_event(
324 self: &Arc<Self>,
325 suggestion_id: Option<String>,
326 suggestion_accepted: bool,
327 file_extension: Option<String>,
328 ) {
329 let event = Event::Copilot {
330 suggestion_id,
331 suggestion_accepted,
332 file_extension,
333 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
334 };
335
336 self.report_event(event)
337 }
338
339 pub fn report_assistant_event(
340 self: &Arc<Self>,
341 conversation_id: Option<String>,
342 kind: AssistantKind,
343 model: &'static str,
344 ) {
345 let event = Event::Assistant {
346 conversation_id,
347 kind,
348 model,
349 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
350 };
351
352 self.report_event(event)
353 }
354
355 pub fn report_call_event(
356 self: &Arc<Self>,
357 operation: &'static str,
358 room_id: Option<u64>,
359 channel_id: Option<u64>,
360 ) {
361 let event = Event::Call {
362 operation,
363 room_id,
364 channel_id,
365 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
366 };
367
368 self.report_event(event)
369 }
370
371 pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
372 let event = Event::Cpu {
373 usage_as_percentage,
374 core_count,
375 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
376 };
377
378 self.report_event(event)
379 }
380
381 pub fn report_memory_event(
382 self: &Arc<Self>,
383 memory_in_bytes: u64,
384 virtual_memory_in_bytes: u64,
385 ) {
386 let event = Event::Memory {
387 memory_in_bytes,
388 virtual_memory_in_bytes,
389 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
390 };
391
392 self.report_event(event)
393 }
394
395 pub fn report_app_event(self: &Arc<Self>, operation: String) {
396 self.report_app_event_with_date_time(operation, Utc::now());
397 }
398
399 fn report_app_event_with_date_time(
400 self: &Arc<Self>,
401 operation: String,
402 date_time: DateTime<Utc>,
403 ) -> Event {
404 let event = Event::App {
405 operation,
406 milliseconds_since_first_event: self.milliseconds_since_first_event(date_time),
407 };
408
409 self.report_event(event.clone());
410
411 event
412 }
413
414 pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
415 let event = Event::Setting {
416 setting,
417 value,
418 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
419 };
420
421 self.report_event(event)
422 }
423
424 pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
425 let mut state = self.state.lock();
426 let period_data = state.event_coalescer.log_event(environment);
427 drop(state);
428
429 if let Some((start, end, environment)) = period_data {
430 let event = Event::Edit {
431 duration: end.timestamp_millis() - start.timestamp_millis(),
432 environment,
433 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
434 };
435
436 self.report_event(event);
437 }
438 }
439
440 pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
441 let event = Event::Action {
442 source,
443 action,
444 milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
445 };
446
447 self.report_event(event)
448 }
449
450 fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
451 let mut state = self.state.lock();
452
453 match state.first_event_date_time {
454 Some(first_event_date_time) => {
455 date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
456 }
457 None => {
458 state.first_event_date_time = Some(date_time);
459 0
460 }
461 }
462 }
463
464 fn report_event(self: &Arc<Self>, event: Event) {
465 let mut state = self.state.lock();
466
467 if !state.settings.metrics {
468 return;
469 }
470
471 if state.flush_events_task.is_none() {
472 let this = self.clone();
473 let executor = self.executor.clone();
474 state.flush_events_task = Some(self.executor.spawn(async move {
475 executor.timer(FLUSH_INTERVAL).await;
476 this.flush_events();
477 }));
478 }
479
480 let signed_in = state.metrics_id.is_some();
481 state.events_queue.push(EventWrapper { signed_in, event });
482
483 if state.installation_id.is_some() {
484 if state.events_queue.len() >= state.max_queue_size {
485 drop(state);
486 self.flush_events();
487 }
488 }
489 }
490
491 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
492 self.state.lock().metrics_id.clone()
493 }
494
495 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
496 self.state.lock().installation_id.clone()
497 }
498
499 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
500 self.state.lock().is_staff
501 }
502
503 pub fn flush_events(self: &Arc<Self>) {
504 let mut state = self.state.lock();
505 state.first_event_date_time = None;
506 let mut events = mem::take(&mut state.events_queue);
507 state.flush_events_task.take();
508 drop(state);
509 if events.is_empty() {
510 return;
511 }
512
513 let this = self.clone();
514 self.executor
515 .spawn(
516 async move {
517 let mut json_bytes = Vec::new();
518
519 if let Some(file) = &mut this.state.lock().log_file {
520 let file = file.as_file_mut();
521 for event in &mut events {
522 json_bytes.clear();
523 serde_json::to_writer(&mut json_bytes, event)?;
524 file.write_all(&json_bytes)?;
525 file.write(b"\n")?;
526 }
527 }
528
529 {
530 let state = this.state.lock();
531 let request_body = EventRequestBody {
532 installation_id: state.installation_id.clone(),
533 session_id: state.session_id.clone(),
534 is_staff: state.is_staff.clone(),
535 app_version: state
536 .app_metadata
537 .app_version
538 .map(|version| version.to_string()),
539 os_name: state.app_metadata.os_name,
540 os_version: state
541 .app_metadata
542 .os_version
543 .map(|version| version.to_string()),
544 architecture: state.architecture,
545
546 release_channel: state.release_channel,
547 events,
548 };
549 json_bytes.clear();
550 serde_json::to_writer(&mut json_bytes, &request_body)?;
551 }
552
553 let mut summer = Sha256::new();
554 summer.update(&*ZED_CLIENT_CHECKSUM_SEED);
555 summer.update(&json_bytes);
556 summer.update(&*ZED_CLIENT_CHECKSUM_SEED);
557 let mut checksum = String::new();
558 for byte in summer.finalize().as_slice() {
559 use std::fmt::Write;
560 write!(&mut checksum, "{:02x}", byte).unwrap();
561 }
562
563 let request = http::Request::builder()
564 .method(Method::POST)
565 .uri(&this.http_client.zed_url("/api/events"))
566 .header("Content-Type", "text/plain")
567 .header("x-zed-checksum", checksum)
568 .body(json_bytes.into());
569
570 let response = this.http_client.send(request?).await?;
571 if response.status() != 200 {
572 log::error!("Failed to send events: HTTP {:?}", response.status());
573 }
574 anyhow::Ok(())
575 }
576 .log_err(),
577 )
578 .detach();
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use chrono::TimeZone;
586 use gpui::TestAppContext;
587 use util::http::FakeHttpClient;
588
589 #[gpui::test]
590 fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
591 init_test(cx);
592 let http = FakeHttpClient::with_200_response();
593 let installation_id = Some("installation_id".to_string());
594 let session_id = "session_id".to_string();
595
596 cx.update(|cx| {
597 let telemetry = Telemetry::new(http, cx);
598
599 telemetry.state.lock().max_queue_size = 4;
600 telemetry.start(installation_id, session_id, cx);
601
602 assert!(is_empty_state(&telemetry));
603
604 let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
605 let operation = "test".to_string();
606
607 let event =
608 telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
609 assert_eq!(
610 event,
611 Event::App {
612 operation: operation.clone(),
613 milliseconds_since_first_event: 0
614 }
615 );
616 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
617 assert!(telemetry.state.lock().flush_events_task.is_some());
618 assert_eq!(
619 telemetry.state.lock().first_event_date_time,
620 Some(first_date_time)
621 );
622
623 let mut date_time = first_date_time + chrono::Duration::milliseconds(100);
624
625 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
626 assert_eq!(
627 event,
628 Event::App {
629 operation: operation.clone(),
630 milliseconds_since_first_event: 100
631 }
632 );
633 assert_eq!(telemetry.state.lock().events_queue.len(), 2);
634 assert!(telemetry.state.lock().flush_events_task.is_some());
635 assert_eq!(
636 telemetry.state.lock().first_event_date_time,
637 Some(first_date_time)
638 );
639
640 date_time += chrono::Duration::milliseconds(100);
641
642 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
643 assert_eq!(
644 event,
645 Event::App {
646 operation: operation.clone(),
647 milliseconds_since_first_event: 200
648 }
649 );
650 assert_eq!(telemetry.state.lock().events_queue.len(), 3);
651 assert!(telemetry.state.lock().flush_events_task.is_some());
652 assert_eq!(
653 telemetry.state.lock().first_event_date_time,
654 Some(first_date_time)
655 );
656
657 date_time += chrono::Duration::milliseconds(100);
658
659 // Adding a 4th event should cause a flush
660 let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
661 assert_eq!(
662 event,
663 Event::App {
664 operation: operation.clone(),
665 milliseconds_since_first_event: 300
666 }
667 );
668
669 assert!(is_empty_state(&telemetry));
670 });
671 }
672
673 #[gpui::test]
674 async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
675 init_test(cx);
676 let http = FakeHttpClient::with_200_response();
677 let installation_id = Some("installation_id".to_string());
678 let session_id = "session_id".to_string();
679
680 cx.update(|cx| {
681 let telemetry = Telemetry::new(http, cx);
682 telemetry.state.lock().max_queue_size = 4;
683 telemetry.start(installation_id, session_id, cx);
684
685 assert!(is_empty_state(&telemetry));
686
687 let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
688 let operation = "test".to_string();
689
690 let event =
691 telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
692 assert_eq!(
693 event,
694 Event::App {
695 operation: operation.clone(),
696 milliseconds_since_first_event: 0
697 }
698 );
699 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
700 assert!(telemetry.state.lock().flush_events_task.is_some());
701 assert_eq!(
702 telemetry.state.lock().first_event_date_time,
703 Some(first_date_time)
704 );
705
706 let duration = Duration::from_millis(1);
707
708 // Test 1 millisecond before the flush interval limit is met
709 executor.advance_clock(FLUSH_INTERVAL - duration);
710
711 assert!(!is_empty_state(&telemetry));
712
713 // Test the exact moment the flush interval limit is met
714 executor.advance_clock(duration);
715
716 assert!(is_empty_state(&telemetry));
717 });
718 }
719
720 // TODO:
721 // Test settings
722 // Update FakeHTTPClient to keep track of the number of requests and assert on it
723
724 fn init_test(cx: &mut TestAppContext) {
725 cx.update(|cx| {
726 let settings_store = SettingsStore::test(cx);
727 cx.set_global(settings_store);
728 });
729 }
730
731 fn is_empty_state(telemetry: &Telemetry) -> bool {
732 telemetry.state.lock().events_queue.is_empty()
733 && telemetry.state.lock().flush_events_task.is_none()
734 && telemetry.state.lock().first_event_date_time.is_none()
735 }
736}