1mod event_coalescer;
2
3use crate::TelemetrySettings;
4use anyhow::{Context as _, Result};
5use clock::SystemClock;
6use fs::Fs;
7use futures::channel::mpsc;
8use futures::{Future, StreamExt};
9use gpui::{App, AppContext as _, BackgroundExecutor, Task};
10use http_client::{self, AsyncBody, HttpClient, HttpClientWithUrl, Method, Request};
11use parking_lot::Mutex;
12use regex::Regex;
13use release_channel::ReleaseChannel;
14use settings::{Settings, SettingsStore};
15use sha2::{Digest, Sha256};
16use std::collections::HashSet;
17use std::fs::File;
18use std::io::Write;
19use std::sync::LazyLock;
20use std::time::Instant;
21use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
22use telemetry_events::{AssistantEventData, AssistantPhase, Event, EventRequestBody, EventWrapper};
23
24pub struct TelemetrySubscription {
25 pub historical_events: Result<HistoricalEvents>,
26 pub queued_events: Vec<EventWrapper>,
27 pub live_events: mpsc::UnboundedReceiver<EventWrapper>,
28}
29
30pub struct HistoricalEvents {
31 pub events: Vec<EventWrapper>,
32 pub parse_error_count: usize,
33}
34use util::ResultExt as _;
35use worktree::{UpdatedEntriesSet, WorktreeId};
36
37use self::event_coalescer::EventCoalescer;
38
39pub struct Telemetry {
40 clock: Arc<dyn SystemClock>,
41 http_client: Arc<HttpClientWithUrl>,
42 executor: BackgroundExecutor,
43 state: Arc<Mutex<TelemetryState>>,
44}
45
46struct TelemetryState {
47 settings: TelemetrySettings,
48 system_id: Option<Arc<str>>, // Per system
49 installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
50 session_id: Option<String>, // Per app launch
51 metrics_id: Option<Arc<str>>, // Per logged-in user
52 release_channel: Option<ReleaseChannel>,
53 architecture: &'static str,
54 events_queue: Vec<EventWrapper>,
55 flush_events_task: Option<Task<()>>,
56
57 log_file: Option<File>,
58 is_staff: Option<bool>,
59 first_event_date_time: Option<Instant>,
60 event_coalescer: EventCoalescer,
61 max_queue_size: usize,
62 worktrees_with_project_type_events_sent: HashSet<WorktreeId>,
63
64 os_name: String,
65 app_version: String,
66 os_version: Option<String>,
67
68 subscribers: Vec<mpsc::UnboundedSender<EventWrapper>>,
69}
70
71#[cfg(debug_assertions)]
72const MAX_QUEUE_LEN: usize = 5;
73
74#[cfg(not(debug_assertions))]
75const MAX_QUEUE_LEN: usize = 50;
76
77#[cfg(debug_assertions)]
78const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
79
80#[cfg(not(debug_assertions))]
81const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
82static ZED_CLIENT_CHECKSUM_SEED: LazyLock<Option<Vec<u8>>> = LazyLock::new(|| {
83 option_env!("ZED_CLIENT_CHECKSUM_SEED")
84 .map(|s| s.as_bytes().into())
85 .or_else(|| {
86 env::var("ZED_CLIENT_CHECKSUM_SEED")
87 .ok()
88 .map(|s| s.as_bytes().into())
89 })
90});
91
92pub static MINIDUMP_ENDPOINT: LazyLock<Option<String>> = LazyLock::new(|| {
93 option_env!("ZED_MINIDUMP_ENDPOINT")
94 .map(str::to_string)
95 .or_else(|| env::var("ZED_MINIDUMP_ENDPOINT").ok())
96});
97
98static DOTNET_PROJECT_FILES_REGEX: LazyLock<Regex> = LazyLock::new(|| {
99 Regex::new(r"^(global\.json|Directory\.Build\.props|.*\.(csproj|fsproj|vbproj|sln))$").unwrap()
100});
101
102#[cfg(target_os = "macos")]
103static MACOS_VERSION_REGEX: LazyLock<Regex> =
104 LazyLock::new(|| Regex::new(r"(\s*\(Build [^)]*[0-9]\))").unwrap());
105
106pub fn os_name() -> String {
107 #[cfg(target_os = "macos")]
108 {
109 "macOS".to_string()
110 }
111 #[cfg(target_os = "linux")]
112 {
113 format!("Linux {}", gpui::guess_compositor())
114 }
115 #[cfg(target_os = "freebsd")]
116 {
117 format!("FreeBSD {}", gpui::guess_compositor())
118 }
119
120 #[cfg(target_os = "windows")]
121 {
122 "Windows".to_string()
123 }
124}
125
126/// Note: This might do blocking IO! Only call from background threads
127pub fn os_version() -> String {
128 #[cfg(target_os = "macos")]
129 {
130 use objc2_foundation::NSProcessInfo;
131 let process_info = NSProcessInfo::processInfo();
132 let version_nsstring = unsafe { process_info.operatingSystemVersionString() };
133 // "Version 15.6.1 (Build 24G90)" -> "15.6.1 (Build 24G90)"
134 let version_string = version_nsstring.to_string().replace("Version ", "");
135 // "15.6.1 (Build 24G90)" -> "15.6.1"
136 // "26.0.0 (Build 25A5349a)" -> unchanged (Beta or Rapid Security Response; ends with letter)
137 MACOS_VERSION_REGEX
138 .replace_all(&version_string, "")
139 .to_string()
140 }
141 #[cfg(any(target_os = "linux", target_os = "freebsd"))]
142 {
143 use std::path::Path;
144
145 let content = if let Ok(file) = std::fs::read_to_string(&Path::new("/etc/os-release")) {
146 file
147 } else if let Ok(file) = std::fs::read_to_string(&Path::new("/usr/lib/os-release")) {
148 file
149 } else if let Ok(file) = std::fs::read_to_string(&Path::new("/var/run/os-release")) {
150 file
151 } else {
152 log::error!(
153 "Failed to load /etc/os-release, /usr/lib/os-release, or /var/run/os-release"
154 );
155 "".to_string()
156 };
157 let mut name = "unknown";
158 let mut version = "unknown";
159
160 for line in content.lines() {
161 match line.split_once('=') {
162 Some(("ID", val)) => name = val.trim_matches('"'),
163 Some(("VERSION_ID", val)) => version = val.trim_matches('"'),
164 _ => {}
165 }
166 }
167
168 format!("{} {}", name, version)
169 }
170
171 #[cfg(target_os = "windows")]
172 {
173 let mut info = unsafe { std::mem::zeroed() };
174 let status = unsafe { windows::Wdk::System::SystemServices::RtlGetVersion(&mut info) };
175 if status.is_ok() {
176 semver::Version::new(
177 info.dwMajorVersion as _,
178 info.dwMinorVersion as _,
179 info.dwBuildNumber as _,
180 )
181 .to_string()
182 } else {
183 "unknown".to_string()
184 }
185 }
186}
187
188impl Telemetry {
189 pub fn new(
190 clock: Arc<dyn SystemClock>,
191 client: Arc<HttpClientWithUrl>,
192 cx: &mut App,
193 ) -> Arc<Self> {
194 let state = Arc::new(Mutex::new(TelemetryState {
195 settings: *TelemetrySettings::get_global(cx),
196 architecture: env::consts::ARCH,
197 release_channel: ReleaseChannel::try_global(cx),
198 system_id: None,
199 installation_id: None,
200 session_id: None,
201 metrics_id: None,
202 events_queue: Vec::new(),
203 flush_events_task: None,
204 log_file: None,
205 is_staff: None,
206 first_event_date_time: None,
207 event_coalescer: EventCoalescer::new(clock.clone()),
208 max_queue_size: MAX_QUEUE_LEN,
209 worktrees_with_project_type_events_sent: HashSet::new(),
210
211 os_version: None,
212 os_name: os_name(),
213 app_version: release_channel::AppVersion::global(cx).to_string(),
214 subscribers: Vec::new(),
215 }));
216
217 cx.background_spawn({
218 let state = state.clone();
219 let os_version = os_version();
220 state.lock().os_version = Some(os_version);
221 async move {
222 if let Some(tempfile) = File::create(Self::log_file_path()).ok() {
223 state.lock().log_file = Some(tempfile);
224 }
225 }
226 })
227 .detach();
228
229 cx.observe_global::<SettingsStore>({
230 let state = state.clone();
231
232 move |cx| {
233 let mut state = state.lock();
234 state.settings = *TelemetrySettings::get_global(cx);
235 }
236 })
237 .detach();
238
239 let this = Arc::new(Self {
240 clock,
241 http_client: client,
242 executor: cx.background_executor().clone(),
243 state,
244 });
245
246 let (tx, mut rx) = mpsc::unbounded();
247 ::telemetry::init(tx);
248
249 cx.background_spawn({
250 let this = Arc::downgrade(&this);
251 async move {
252 if cfg!(feature = "test-support") {
253 return;
254 }
255 while let Some(event) = rx.next().await {
256 let Some(state) = this.upgrade() else { break };
257 state.report_event(Event::Flexible(event))
258 }
259 }
260 })
261 .detach();
262
263 // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
264 // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
265 std::mem::forget(cx.on_app_quit({
266 let this = this.clone();
267 move |_| this.shutdown_telemetry()
268 }));
269
270 this
271 }
272
273 #[cfg(any(test, feature = "test-support"))]
274 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> + use<> {
275 Task::ready(())
276 }
277
278 // Skip calling this function in tests.
279 // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
280 #[cfg(not(any(test, feature = "test-support")))]
281 fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> + use<> {
282 telemetry::event!("App Closed");
283 // TODO: close final edit period and make sure it's sent
284 Task::ready(())
285 }
286
287 pub fn log_file_path() -> PathBuf {
288 paths::logs_dir().join("telemetry.log")
289 }
290
291 pub async fn subscribe_with_history(
292 self: &Arc<Self>,
293 fs: Arc<dyn Fs>,
294 ) -> TelemetrySubscription {
295 let historical_events = self.read_log_file(fs).await;
296
297 let mut state = self.state.lock();
298 let queued_events: Vec<EventWrapper> = state.events_queue.clone();
299
300 let (tx, rx) = mpsc::unbounded();
301 state.subscribers.push(tx);
302
303 drop(state);
304
305 TelemetrySubscription {
306 historical_events,
307 queued_events,
308 live_events: rx,
309 }
310 }
311
312 async fn read_log_file(self: &Arc<Self>, fs: Arc<dyn Fs>) -> anyhow::Result<HistoricalEvents> {
313 const MAX_LOG_READ: usize = 5 * 1024 * 1024;
314
315 let path = Self::log_file_path();
316
317 let content = fs
318 .load_bytes(&path)
319 .await
320 .with_context(|| format!("failed to load telemetry log from {:?}", path))?;
321
322 let start_offset = if content.len() > MAX_LOG_READ {
323 let skip = content.len() - MAX_LOG_READ;
324 content[skip..]
325 .iter()
326 .position(|&b| b == b'\n')
327 .map(|pos| skip + pos + 1)
328 .unwrap_or(skip)
329 } else {
330 0
331 };
332
333 let content_str = std::str::from_utf8(&content[start_offset..])
334 .context("telemetry log file contains invalid UTF-8")?;
335
336 let mut events = Vec::new();
337 let mut parse_error_count = 0;
338
339 for line in content_str.lines() {
340 if line.trim().is_empty() {
341 continue;
342 }
343 match serde_json::from_str::<EventWrapper>(line) {
344 Ok(event) => events.push(event),
345 Err(_) => parse_error_count += 1,
346 }
347 }
348
349 Ok(HistoricalEvents {
350 events,
351 parse_error_count,
352 })
353 }
354
355 pub fn has_checksum_seed(&self) -> bool {
356 ZED_CLIENT_CHECKSUM_SEED.is_some()
357 }
358
359 pub fn start(
360 self: &Arc<Self>,
361 system_id: Option<String>,
362 installation_id: Option<String>,
363 session_id: String,
364 cx: &App,
365 ) {
366 let mut state = self.state.lock();
367 state.system_id = system_id.map(|id| id.into());
368 state.installation_id = installation_id.map(|id| id.into());
369 state.session_id = Some(session_id);
370 state.app_version = release_channel::AppVersion::global(cx).to_string();
371 state.os_name = os_name();
372 }
373
374 pub fn metrics_enabled(self: &Arc<Self>) -> bool {
375 self.state.lock().settings.metrics
376 }
377
378 pub fn diagnostics_enabled(self: &Arc<Self>) -> bool {
379 self.state.lock().settings.diagnostics
380 }
381
382 pub fn set_authenticated_user_info(
383 self: &Arc<Self>,
384 metrics_id: Option<String>,
385 is_staff: bool,
386 ) {
387 let mut state = self.state.lock();
388
389 if !state.settings.metrics {
390 return;
391 }
392
393 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
394 state.metrics_id.clone_from(&metrics_id);
395 state.is_staff = Some(is_staff);
396 drop(state);
397 }
398
399 pub fn report_assistant_event(self: &Arc<Self>, event: AssistantEventData) {
400 let event_type = match event.phase {
401 AssistantPhase::Response => "Assistant Responded",
402 AssistantPhase::Invoked => "Assistant Invoked",
403 AssistantPhase::Accepted => "Assistant Response Accepted",
404 AssistantPhase::Rejected => "Assistant Response Rejected",
405 };
406
407 telemetry::event!(
408 event_type,
409 conversation_id = event.conversation_id,
410 kind = event.kind,
411 phase = event.phase,
412 message_id = event.message_id,
413 model = event.model,
414 model_provider = event.model_provider,
415 response_latency = event.response_latency,
416 error_message = event.error_message,
417 language_name = event.language_name,
418 );
419 }
420
421 pub fn log_edit_event(self: &Arc<Self>, environment: &'static str, is_via_ssh: bool) {
422 static LAST_EVENT_TIME: Mutex<Option<Instant>> = Mutex::new(None);
423
424 let mut state = self.state.lock();
425 let period_data = state.event_coalescer.log_event(environment);
426 drop(state);
427
428 if let Some(mut last_event) = LAST_EVENT_TIME.try_lock() {
429 let current_time = std::time::Instant::now();
430 let last_time = last_event.get_or_insert(current_time);
431
432 if current_time.duration_since(*last_time) > Duration::from_secs(60 * 10) {
433 *last_time = current_time;
434 } else {
435 return;
436 }
437
438 if let Some((start, end, environment)) = period_data {
439 let duration = end
440 .saturating_duration_since(start)
441 .min(Duration::from_secs(60 * 60 * 24))
442 .as_millis() as i64;
443
444 telemetry::event!(
445 "Editor Edited",
446 duration = duration,
447 environment = environment,
448 is_via_ssh = is_via_ssh
449 );
450 }
451 }
452 }
453
454 pub fn report_discovered_project_type_events(
455 self: &Arc<Self>,
456 worktree_id: WorktreeId,
457 updated_entries_set: &UpdatedEntriesSet,
458 ) {
459 let Some(project_types) = self.detect_project_types(worktree_id, updated_entries_set)
460 else {
461 return;
462 };
463
464 for project_type in project_types {
465 telemetry::event!("Project Opened", project_type = project_type);
466 }
467 }
468
469 fn detect_project_types(
470 self: &Arc<Self>,
471 worktree_id: WorktreeId,
472 updated_entries_set: &UpdatedEntriesSet,
473 ) -> Option<Vec<String>> {
474 let mut state = self.state.lock();
475
476 if state
477 .worktrees_with_project_type_events_sent
478 .contains(&worktree_id)
479 {
480 return None;
481 }
482
483 let mut project_types: HashSet<&str> = HashSet::new();
484
485 for (path, _, _) in updated_entries_set.iter() {
486 let Some(file_name) = path.file_name() else {
487 continue;
488 };
489
490 let project_type = match file_name {
491 "pnpm-lock.yaml" => Some("pnpm"),
492 "yarn.lock" => Some("yarn"),
493 "package.json" => Some("node"),
494 _ if DOTNET_PROJECT_FILES_REGEX.is_match(file_name) => Some("dotnet"),
495 _ => None,
496 };
497
498 if let Some(project_type) = project_type {
499 project_types.insert(project_type);
500 };
501 }
502
503 if !project_types.is_empty() {
504 state
505 .worktrees_with_project_type_events_sent
506 .insert(worktree_id);
507 }
508
509 let mut project_types: Vec<_> = project_types.into_iter().map(String::from).collect();
510 project_types.sort();
511 Some(project_types)
512 }
513
514 fn report_event(self: &Arc<Self>, mut event: Event) {
515 let mut state = self.state.lock();
516 // RUST_LOG=telemetry=trace to debug telemetry events
517 log::trace!(target: "telemetry", "{:?}", event);
518
519 if !state.settings.metrics {
520 return;
521 }
522
523 match &mut event {
524 Event::Flexible(event) => event
525 .event_properties
526 .insert("event_source".into(), "zed".into()),
527 };
528
529 if state.flush_events_task.is_none() {
530 let this = self.clone();
531 state.flush_events_task = Some(self.executor.spawn(async move {
532 this.executor.timer(FLUSH_INTERVAL).await;
533 this.flush_events().detach();
534 }));
535 }
536
537 let date_time = self.clock.utc_now();
538
539 let milliseconds_since_first_event = match state.first_event_date_time {
540 Some(first_event_date_time) => date_time
541 .saturating_duration_since(first_event_date_time)
542 .min(Duration::from_secs(60 * 60 * 24))
543 .as_millis() as i64,
544 None => {
545 state.first_event_date_time = Some(date_time);
546 0
547 }
548 };
549
550 let signed_in = state.metrics_id.is_some();
551 let event_wrapper = EventWrapper {
552 signed_in,
553 milliseconds_since_first_event,
554 event,
555 };
556
557 state
558 .subscribers
559 .retain(|tx| tx.unbounded_send(event_wrapper.clone()).is_ok());
560
561 state.events_queue.push(event_wrapper);
562
563 if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
564 drop(state);
565 self.flush_events().detach();
566 }
567 }
568
569 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
570 self.state.lock().metrics_id.clone()
571 }
572
573 pub fn system_id(self: &Arc<Self>) -> Option<Arc<str>> {
574 self.state.lock().system_id.clone()
575 }
576
577 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
578 self.state.lock().installation_id.clone()
579 }
580
581 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
582 self.state.lock().is_staff
583 }
584
585 fn build_request(
586 self: &Arc<Self>,
587 // We take in the JSON bytes buffer so we can reuse the existing allocation.
588 mut json_bytes: Vec<u8>,
589 event_request: &EventRequestBody,
590 ) -> Result<Request<AsyncBody>> {
591 json_bytes.clear();
592 serde_json::to_writer(&mut json_bytes, event_request)?;
593
594 let checksum = calculate_json_checksum(&json_bytes).unwrap_or_default();
595
596 Ok(Request::builder()
597 .method(Method::POST)
598 .uri(
599 self.http_client
600 .build_zed_api_url("/telemetry/events", &[])?
601 .as_ref(),
602 )
603 .header("Content-Type", "application/json")
604 .header("x-zed-checksum", checksum)
605 .body(json_bytes.into())?)
606 }
607
608 pub async fn flush_events_inner(self: &Arc<Self>) -> Result<()> {
609 let (json_bytes, request_body) = {
610 let mut state = self.state.lock();
611 state.first_event_date_time = None;
612 let events = mem::take(&mut state.events_queue);
613 state.flush_events_task.take();
614 if events.is_empty() {
615 return Ok(());
616 }
617
618 let mut json_bytes = Vec::new();
619
620 if let Some(file) = &mut state.log_file {
621 for event in &events {
622 json_bytes.clear();
623 serde_json::to_writer(&mut json_bytes, event)?;
624 file.write_all(&json_bytes)?;
625 file.write_all(b"\n")?;
626 }
627 }
628
629 (
630 json_bytes,
631 EventRequestBody {
632 system_id: state.system_id.as_deref().map(Into::into),
633 installation_id: state.installation_id.as_deref().map(Into::into),
634 session_id: state.session_id.clone(),
635 metrics_id: state.metrics_id.as_deref().map(Into::into),
636 is_staff: state.is_staff,
637 app_version: state.app_version.clone(),
638 os_name: state.os_name.clone(),
639 os_version: state.os_version.clone(),
640 architecture: state.architecture.to_string(),
641
642 release_channel: state
643 .release_channel
644 .map(|channel| channel.display_name().to_owned()),
645 events,
646 },
647 )
648 };
649
650 let request = self.build_request(json_bytes, &request_body)?;
651 let response = self.http_client.send(request).await?;
652 if response.status() != 200 {
653 log::error!("Failed to send events: HTTP {:?}", response.status());
654 }
655
656 anyhow::Ok(())
657 }
658
659 pub fn flush_events(self: &Arc<Self>) -> Task<()> {
660 let this = self.clone();
661 self.executor.spawn(async move {
662 this.flush_events_inner().await.log_err();
663 })
664 }
665}
666
667pub fn calculate_json_checksum(json: &impl AsRef<[u8]>) -> Option<String> {
668 let checksum_seed = ZED_CLIENT_CHECKSUM_SEED.as_ref()?;
669
670 let mut summer = Sha256::new();
671 summer.update(checksum_seed);
672 summer.update(json);
673 summer.update(checksum_seed);
674 let mut checksum = String::new();
675 for byte in summer.finalize().as_slice() {
676 use std::fmt::Write;
677 write!(&mut checksum, "{:02x}", byte).unwrap();
678 }
679
680 Some(checksum)
681}
682
683#[cfg(test)]
684mod tests {
685 use super::*;
686 use clock::FakeSystemClock;
687
688 use gpui::TestAppContext;
689 use http_client::FakeHttpClient;
690 use std::collections::HashMap;
691 use telemetry_events::FlexibleEvent;
692 use util::rel_path::RelPath;
693 use worktree::{PathChange, ProjectEntryId, WorktreeId};
694
695 #[gpui::test]
696 async fn test_telemetry_flush_on_max_queue_size(
697 executor: BackgroundExecutor,
698 cx: &mut TestAppContext,
699 ) {
700 init_test(cx);
701 let clock = Arc::new(FakeSystemClock::new());
702 let http = FakeHttpClient::with_200_response();
703 let system_id = Some("system_id".to_string());
704 let installation_id = Some("installation_id".to_string());
705 let session_id = "session_id".to_string();
706
707 let (telemetry, first_date_time, event) = cx.update(|cx| {
708 let telemetry = Telemetry::new(clock.clone(), http, cx);
709
710 telemetry.state.lock().max_queue_size = 4;
711 telemetry.start(system_id, installation_id, session_id, cx);
712
713 assert!(is_empty_state(&telemetry));
714
715 let first_date_time = clock.utc_now();
716 let event_properties = HashMap::from_iter([(
717 "test_key".to_string(),
718 serde_json::Value::String("test_value".to_string()),
719 )]);
720
721 let event = FlexibleEvent {
722 event_type: "test".to_string(),
723 event_properties,
724 };
725
726 (telemetry, first_date_time, event)
727 });
728
729 cx.update(|_cx| {
730 telemetry.report_event(Event::Flexible(event.clone()));
731 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
732 assert!(telemetry.state.lock().flush_events_task.is_some());
733 assert_eq!(
734 telemetry.state.lock().first_event_date_time,
735 Some(first_date_time)
736 );
737
738 clock.advance(Duration::from_millis(100));
739
740 telemetry.report_event(Event::Flexible(event.clone()));
741 assert_eq!(telemetry.state.lock().events_queue.len(), 2);
742 assert!(telemetry.state.lock().flush_events_task.is_some());
743 assert_eq!(
744 telemetry.state.lock().first_event_date_time,
745 Some(first_date_time)
746 );
747
748 clock.advance(Duration::from_millis(100));
749
750 telemetry.report_event(Event::Flexible(event.clone()));
751 assert_eq!(telemetry.state.lock().events_queue.len(), 3);
752 assert!(telemetry.state.lock().flush_events_task.is_some());
753 assert_eq!(
754 telemetry.state.lock().first_event_date_time,
755 Some(first_date_time)
756 );
757
758 clock.advance(Duration::from_millis(100));
759
760 // Adding a 4th event should cause a flush
761 telemetry.report_event(Event::Flexible(event));
762 });
763
764 // Run the spawned flush task to completion
765 executor.run_until_parked();
766
767 cx.update(|_cx| {
768 assert!(is_empty_state(&telemetry));
769 });
770 }
771
772 #[gpui::test]
773 async fn test_telemetry_flush_on_flush_interval(
774 executor: BackgroundExecutor,
775 cx: &mut TestAppContext,
776 ) {
777 init_test(cx);
778 let clock = Arc::new(FakeSystemClock::new());
779 let http = FakeHttpClient::with_200_response();
780 let system_id = Some("system_id".to_string());
781 let installation_id = Some("installation_id".to_string());
782 let session_id = "session_id".to_string();
783
784 cx.update(|cx| {
785 let telemetry = Telemetry::new(clock.clone(), http, cx);
786 telemetry.state.lock().max_queue_size = 4;
787 telemetry.start(system_id, installation_id, session_id, cx);
788
789 assert!(is_empty_state(&telemetry));
790 let first_date_time = clock.utc_now();
791
792 let event_properties = HashMap::from_iter([(
793 "test_key".to_string(),
794 serde_json::Value::String("test_value".to_string()),
795 )]);
796
797 let event = FlexibleEvent {
798 event_type: "test".to_string(),
799 event_properties,
800 };
801
802 telemetry.report_event(Event::Flexible(event));
803 assert_eq!(telemetry.state.lock().events_queue.len(), 1);
804 assert!(telemetry.state.lock().flush_events_task.is_some());
805 assert_eq!(
806 telemetry.state.lock().first_event_date_time,
807 Some(first_date_time)
808 );
809
810 let duration = Duration::from_millis(1);
811
812 // Test 1 millisecond before the flush interval limit is met
813 executor.advance_clock(FLUSH_INTERVAL - duration);
814
815 assert!(!is_empty_state(&telemetry));
816
817 // Test the exact moment the flush interval limit is met
818 executor.advance_clock(duration);
819
820 assert!(is_empty_state(&telemetry));
821 });
822 }
823
824 #[gpui::test]
825 fn test_project_discovery_does_not_double_report(cx: &mut gpui::TestAppContext) {
826 init_test(cx);
827
828 let clock = Arc::new(FakeSystemClock::new());
829 let http = FakeHttpClient::with_200_response();
830 let telemetry = cx.update(|cx| Telemetry::new(clock.clone(), http, cx));
831 let worktree_id = 1;
832
833 // Scan of empty worktree finds nothing
834 test_project_discovery_helper(telemetry.clone(), vec![], Some(vec![]), worktree_id);
835
836 // Files added, second scan of worktree 1 finds project type
837 test_project_discovery_helper(
838 telemetry.clone(),
839 vec!["package.json"],
840 Some(vec!["node"]),
841 worktree_id,
842 );
843
844 // Third scan of worktree does not double report, as we already reported
845 test_project_discovery_helper(telemetry, vec!["package.json"], None, worktree_id);
846 }
847
848 #[gpui::test]
849 fn test_pnpm_project_discovery(cx: &mut gpui::TestAppContext) {
850 init_test(cx);
851
852 let clock = Arc::new(FakeSystemClock::new());
853 let http = FakeHttpClient::with_200_response();
854 let telemetry = cx.update(|cx| Telemetry::new(clock.clone(), http, cx));
855
856 test_project_discovery_helper(
857 telemetry,
858 vec!["package.json", "pnpm-lock.yaml"],
859 Some(vec!["node", "pnpm"]),
860 1,
861 );
862 }
863
864 #[gpui::test]
865 fn test_yarn_project_discovery(cx: &mut gpui::TestAppContext) {
866 init_test(cx);
867
868 let clock = Arc::new(FakeSystemClock::new());
869 let http = FakeHttpClient::with_200_response();
870 let telemetry = cx.update(|cx| Telemetry::new(clock.clone(), http, cx));
871
872 test_project_discovery_helper(
873 telemetry,
874 vec!["package.json", "yarn.lock"],
875 Some(vec!["node", "yarn"]),
876 1,
877 );
878 }
879
880 #[gpui::test]
881 fn test_dotnet_project_discovery(cx: &mut gpui::TestAppContext) {
882 init_test(cx);
883
884 let clock = Arc::new(FakeSystemClock::new());
885 let http = FakeHttpClient::with_200_response();
886 let telemetry = cx.update(|cx| Telemetry::new(clock.clone(), http, cx));
887
888 // Using different worktrees, as production code blocks from reporting a
889 // project type for the same worktree multiple times
890
891 test_project_discovery_helper(
892 telemetry.clone(),
893 vec!["global.json"],
894 Some(vec!["dotnet"]),
895 1,
896 );
897 test_project_discovery_helper(
898 telemetry.clone(),
899 vec!["Directory.Build.props"],
900 Some(vec!["dotnet"]),
901 2,
902 );
903 test_project_discovery_helper(
904 telemetry.clone(),
905 vec!["file.csproj"],
906 Some(vec!["dotnet"]),
907 3,
908 );
909 test_project_discovery_helper(
910 telemetry.clone(),
911 vec!["file.fsproj"],
912 Some(vec!["dotnet"]),
913 4,
914 );
915 test_project_discovery_helper(
916 telemetry.clone(),
917 vec!["file.vbproj"],
918 Some(vec!["dotnet"]),
919 5,
920 );
921 test_project_discovery_helper(telemetry.clone(), vec!["file.sln"], Some(vec!["dotnet"]), 6);
922
923 // Each worktree should only send a single project type event, even when
924 // encountering multiple files associated with that project type
925 test_project_discovery_helper(
926 telemetry,
927 vec!["global.json", "Directory.Build.props"],
928 Some(vec!["dotnet"]),
929 7,
930 );
931 }
932
933 // TODO:
934 // Test settings
935 // Update FakeHTTPClient to keep track of the number of requests and assert on it
936
937 fn init_test(cx: &mut TestAppContext) {
938 cx.update(|cx| {
939 let settings_store = SettingsStore::test(cx);
940 cx.set_global(settings_store);
941 });
942 }
943
944 fn is_empty_state(telemetry: &Telemetry) -> bool {
945 telemetry.state.lock().events_queue.is_empty()
946 && telemetry.state.lock().flush_events_task.is_none()
947 && telemetry.state.lock().first_event_date_time.is_none()
948 }
949
950 fn test_project_discovery_helper(
951 telemetry: Arc<Telemetry>,
952 file_paths: Vec<&str>,
953 expected_project_types: Option<Vec<&str>>,
954 worktree_id_num: usize,
955 ) {
956 let worktree_id = WorktreeId::from_usize(worktree_id_num);
957 let entries: Vec<_> = file_paths
958 .into_iter()
959 .enumerate()
960 .filter_map(|(i, path)| {
961 Some((
962 Arc::from(RelPath::unix(path).ok()?),
963 ProjectEntryId::from_proto(i as u64 + 1),
964 PathChange::Added,
965 ))
966 })
967 .collect();
968 let updated_entries: UpdatedEntriesSet = Arc::from(entries.as_slice());
969
970 let detected_project_types = telemetry.detect_project_types(worktree_id, &updated_entries);
971
972 let expected_project_types =
973 expected_project_types.map(|types| types.iter().map(|&t| t.to_string()).collect());
974
975 assert_eq!(detected_project_types, expected_project_types);
976 }
977}