telemetry.rs

  1mod event_coalescer;
  2
  3use crate::{ChannelId, TelemetrySettings};
  4use anyhow::Result;
  5use clock::SystemClock;
  6use collections::{HashMap, HashSet};
  7use futures::channel::mpsc;
  8use futures::{Future, StreamExt};
  9use gpui::{AppContext, BackgroundExecutor, Task};
 10use http_client::{self, AsyncBody, HttpClient, HttpClientWithUrl, Method, Request};
 11use once_cell::sync::Lazy;
 12use parking_lot::Mutex;
 13use release_channel::ReleaseChannel;
 14use settings::{Settings, SettingsStore};
 15use sha2::{Digest, Sha256};
 16use std::fs::File;
 17use std::io::Write;
 18use std::time::Instant;
 19use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 20use telemetry_events::{
 21    AppEvent, AssistantEvent, CallEvent, EditEvent, Event, EventRequestBody, EventWrapper,
 22    InlineCompletionEvent, InlineCompletionRating, InlineCompletionRatingEvent, SettingEvent,
 23};
 24use util::{ResultExt, TryFutureExt};
 25use worktree::{UpdatedEntriesSet, WorktreeId};
 26
 27use self::event_coalescer::EventCoalescer;
 28
 29pub struct Telemetry {
 30    clock: Arc<dyn SystemClock>,
 31    http_client: Arc<HttpClientWithUrl>,
 32    executor: BackgroundExecutor,
 33    state: Arc<Mutex<TelemetryState>>,
 34}
 35
 36struct TelemetryState {
 37    settings: TelemetrySettings,
 38    system_id: Option<Arc<str>>,       // Per system
 39    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 40    session_id: Option<String>,        // Per app launch
 41    metrics_id: Option<Arc<str>>,      // Per logged-in user
 42    release_channel: Option<&'static str>,
 43    architecture: &'static str,
 44    events_queue: Vec<EventWrapper>,
 45    flush_events_task: Option<Task<()>>,
 46    log_file: Option<File>,
 47    is_staff: Option<bool>,
 48    first_event_date_time: Option<Instant>,
 49    event_coalescer: EventCoalescer,
 50    max_queue_size: usize,
 51    worktree_id_map: WorktreeIdMap,
 52
 53    os_name: String,
 54    app_version: String,
 55    os_version: Option<String>,
 56}
 57
 58#[derive(Debug)]
 59struct WorktreeIdMap(HashMap<String, ProjectCache>);
 60
 61#[derive(Debug)]
 62struct ProjectCache {
 63    name: String,
 64    worktree_ids_reported: HashSet<WorktreeId>,
 65}
 66
 67impl ProjectCache {
 68    fn new(name: String) -> Self {
 69        Self {
 70            name,
 71            worktree_ids_reported: HashSet::default(),
 72        }
 73    }
 74}
 75
 76#[cfg(debug_assertions)]
 77const MAX_QUEUE_LEN: usize = 5;
 78
 79#[cfg(not(debug_assertions))]
 80const MAX_QUEUE_LEN: usize = 50;
 81
 82#[cfg(debug_assertions)]
 83const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
 84
 85#[cfg(not(debug_assertions))]
 86const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
 87static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
 88    option_env!("ZED_CLIENT_CHECKSUM_SEED")
 89        .map(|s| s.as_bytes().into())
 90        .or_else(|| {
 91            env::var("ZED_CLIENT_CHECKSUM_SEED")
 92                .ok()
 93                .map(|s| s.as_bytes().into())
 94        })
 95});
 96
 97pub fn os_name() -> String {
 98    #[cfg(target_os = "macos")]
 99    {
100        "macOS".to_string()
101    }
102    #[cfg(any(target_os = "linux", target_os = "freebsd"))]
103    {
104        format!("Linux {}", gpui::guess_compositor())
105    }
106
107    #[cfg(target_os = "windows")]
108    {
109        "Windows".to_string()
110    }
111}
112
113/// Note: This might do blocking IO! Only call from background threads
114pub fn os_version() -> String {
115    #[cfg(target_os = "macos")]
116    {
117        use cocoa::base::nil;
118        use cocoa::foundation::NSProcessInfo;
119
120        unsafe {
121            let process_info = cocoa::foundation::NSProcessInfo::processInfo(nil);
122            let version = process_info.operatingSystemVersion();
123            gpui::SemanticVersion::new(
124                version.majorVersion as usize,
125                version.minorVersion as usize,
126                version.patchVersion as usize,
127            )
128            .to_string()
129        }
130    }
131    #[cfg(any(target_os = "linux", target_os = "freebsd"))]
132    {
133        use std::path::Path;
134
135        let content = if let Ok(file) = std::fs::read_to_string(&Path::new("/etc/os-release")) {
136            file
137        } else if let Ok(file) = std::fs::read_to_string(&Path::new("/usr/lib/os-release")) {
138            file
139        } else {
140            log::error!("Failed to load /etc/os-release, /usr/lib/os-release");
141            "".to_string()
142        };
143        let mut name = "unknown".to_string();
144        let mut version = "unknown".to_string();
145
146        for line in content.lines() {
147            if line.starts_with("ID=") {
148                name = line.trim_start_matches("ID=").trim_matches('"').to_string();
149            }
150            if line.starts_with("VERSION_ID=") {
151                version = line
152                    .trim_start_matches("VERSION_ID=")
153                    .trim_matches('"')
154                    .to_string();
155            }
156        }
157
158        format!("{} {}", name, version)
159    }
160
161    #[cfg(target_os = "windows")]
162    {
163        let mut info = unsafe { std::mem::zeroed() };
164        let status = unsafe { windows::Wdk::System::SystemServices::RtlGetVersion(&mut info) };
165        if status.is_ok() {
166            gpui::SemanticVersion::new(
167                info.dwMajorVersion as _,
168                info.dwMinorVersion as _,
169                info.dwBuildNumber as _,
170            )
171            .to_string()
172        } else {
173            "unknown".to_string()
174        }
175    }
176}
177
178impl Telemetry {
179    pub fn new(
180        clock: Arc<dyn SystemClock>,
181        client: Arc<HttpClientWithUrl>,
182        cx: &mut AppContext,
183    ) -> Arc<Self> {
184        let release_channel =
185            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
186
187        TelemetrySettings::register(cx);
188
189        let state = Arc::new(Mutex::new(TelemetryState {
190            settings: *TelemetrySettings::get_global(cx),
191            architecture: env::consts::ARCH,
192            release_channel,
193            system_id: None,
194            installation_id: None,
195            session_id: None,
196            metrics_id: None,
197            events_queue: Vec::new(),
198            flush_events_task: None,
199            log_file: None,
200            is_staff: None,
201            first_event_date_time: None,
202            event_coalescer: EventCoalescer::new(clock.clone()),
203            max_queue_size: MAX_QUEUE_LEN,
204            worktree_id_map: WorktreeIdMap(HashMap::from_iter([
205                (
206                    "pnpm-lock.yaml".to_string(),
207                    ProjectCache::new("pnpm".to_string()),
208                ),
209                (
210                    "yarn.lock".to_string(),
211                    ProjectCache::new("yarn".to_string()),
212                ),
213                (
214                    "package.json".to_string(),
215                    ProjectCache::new("node".to_string()),
216                ),
217            ])),
218
219            os_version: None,
220            os_name: os_name(),
221            app_version: release_channel::AppVersion::global(cx).to_string(),
222        }));
223        Self::log_file_path();
224
225        cx.background_executor()
226            .spawn({
227                let state = state.clone();
228                let os_version = os_version();
229                state.lock().os_version = Some(os_version.clone());
230                async move {
231                    if let Some(tempfile) = File::create(Self::log_file_path()).log_err() {
232                        state.lock().log_file = Some(tempfile);
233                    }
234                }
235            })
236            .detach();
237
238        cx.observe_global::<SettingsStore>({
239            let state = state.clone();
240
241            move |cx| {
242                let mut state = state.lock();
243                state.settings = *TelemetrySettings::get_global(cx);
244            }
245        })
246        .detach();
247
248        let this = Arc::new(Self {
249            clock,
250            http_client: client,
251            executor: cx.background_executor().clone(),
252            state,
253        });
254
255        let (tx, mut rx) = mpsc::unbounded();
256        ::telemetry::init(tx);
257
258        cx.background_executor()
259            .spawn({
260                let this = Arc::downgrade(&this);
261                async move {
262                    while let Some(event) = rx.next().await {
263                        let Some(state) = this.upgrade() else { break };
264                        state.report_event(Event::Flexible(event))
265                    }
266                }
267            })
268            .detach();
269
270        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
271        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
272        std::mem::forget(cx.on_app_quit({
273            let this = this.clone();
274            move |_| this.shutdown_telemetry()
275        }));
276
277        this
278    }
279
280    #[cfg(any(test, feature = "test-support"))]
281    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
282        Task::ready(())
283    }
284
285    // Skip calling this function in tests.
286    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
287    #[cfg(not(any(test, feature = "test-support")))]
288    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
289        self.report_app_event("close".to_string());
290        // TODO: close final edit period and make sure it's sent
291        Task::ready(())
292    }
293
294    pub fn log_file_path() -> PathBuf {
295        paths::logs_dir().join("telemetry.log")
296    }
297
298    pub fn start(
299        self: &Arc<Self>,
300        system_id: Option<String>,
301        installation_id: Option<String>,
302        session_id: String,
303        cx: &AppContext,
304    ) {
305        let mut state = self.state.lock();
306        state.system_id = system_id.map(|id| id.into());
307        state.installation_id = installation_id.map(|id| id.into());
308        state.session_id = Some(session_id);
309        state.app_version = release_channel::AppVersion::global(cx).to_string();
310        state.os_name = os_name();
311    }
312
313    pub fn metrics_enabled(self: &Arc<Self>) -> bool {
314        let state = self.state.lock();
315        let enabled = state.settings.metrics;
316        drop(state);
317        enabled
318    }
319
320    pub fn set_authenticated_user_info(
321        self: &Arc<Self>,
322        metrics_id: Option<String>,
323        is_staff: bool,
324    ) {
325        let mut state = self.state.lock();
326
327        if !state.settings.metrics {
328            return;
329        }
330
331        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
332        state.metrics_id.clone_from(&metrics_id);
333        state.is_staff = Some(is_staff);
334        drop(state);
335    }
336
337    pub fn report_inline_completion_event(
338        self: &Arc<Self>,
339        provider: String,
340        suggestion_accepted: bool,
341        file_extension: Option<String>,
342    ) {
343        let event = Event::InlineCompletion(InlineCompletionEvent {
344            provider,
345            suggestion_accepted,
346            file_extension,
347        });
348
349        self.report_event(event)
350    }
351
352    pub fn report_inline_completion_rating_event(
353        self: &Arc<Self>,
354        rating: InlineCompletionRating,
355        input_events: Arc<str>,
356        input_excerpt: Arc<str>,
357        output_excerpt: Arc<str>,
358        feedback: String,
359    ) {
360        let event = Event::InlineCompletionRating(InlineCompletionRatingEvent {
361            rating,
362            input_events,
363            input_excerpt,
364            output_excerpt,
365            feedback,
366        });
367        self.report_event(event);
368    }
369
370    pub fn report_assistant_event(self: &Arc<Self>, event: AssistantEvent) {
371        self.report_event(Event::Assistant(event));
372    }
373
374    pub fn report_call_event(
375        self: &Arc<Self>,
376        operation: &'static str,
377        room_id: Option<u64>,
378        channel_id: Option<ChannelId>,
379    ) {
380        let event = Event::Call(CallEvent {
381            operation: operation.to_string(),
382            room_id,
383            channel_id: channel_id.map(|cid| cid.0),
384        });
385
386        self.report_event(event)
387    }
388
389    pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
390        let event = Event::App(AppEvent { operation });
391
392        self.report_event(event.clone());
393
394        event
395    }
396
397    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
398        let event = Event::Setting(SettingEvent {
399            setting: setting.to_string(),
400            value,
401        });
402
403        self.report_event(event)
404    }
405
406    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str, is_via_ssh: bool) {
407        let mut state = self.state.lock();
408        let period_data = state.event_coalescer.log_event(environment);
409        drop(state);
410
411        if let Some((start, end, environment)) = period_data {
412            let event = Event::Edit(EditEvent {
413                duration: end
414                    .saturating_duration_since(start)
415                    .min(Duration::from_secs(60 * 60 * 24))
416                    .as_millis() as i64,
417                environment: environment.to_string(),
418                is_via_ssh,
419            });
420
421            self.report_event(event);
422        }
423    }
424
425    pub fn report_discovered_project_events(
426        self: &Arc<Self>,
427        worktree_id: WorktreeId,
428        updated_entries_set: &UpdatedEntriesSet,
429    ) {
430        let project_type_names: Vec<String> = {
431            let mut state = self.state.lock();
432            state
433                .worktree_id_map
434                .0
435                .iter_mut()
436                .filter_map(|(project_file_name, project_type_telemetry)| {
437                    if project_type_telemetry
438                        .worktree_ids_reported
439                        .contains(&worktree_id)
440                    {
441                        return None;
442                    }
443
444                    let project_file_found = updated_entries_set.iter().any(|(path, _, _)| {
445                        path.as_ref()
446                            .file_name()
447                            .and_then(|name| name.to_str())
448                            .map(|name_str| name_str == project_file_name)
449                            .unwrap_or(false)
450                    });
451
452                    if !project_file_found {
453                        return None;
454                    }
455
456                    project_type_telemetry
457                        .worktree_ids_reported
458                        .insert(worktree_id);
459
460                    Some(project_type_telemetry.name.clone())
461                })
462                .collect()
463        };
464
465        // Done on purpose to avoid calling `self.state.lock()` multiple times
466        for project_type_name in project_type_names {
467            self.report_app_event(format!("open {} project", project_type_name));
468        }
469    }
470
471    fn report_event(self: &Arc<Self>, event: Event) {
472        let mut state = self.state.lock();
473
474        if !state.settings.metrics {
475            return;
476        }
477
478        if state.flush_events_task.is_none() {
479            let this = self.clone();
480            let executor = self.executor.clone();
481            state.flush_events_task = Some(self.executor.spawn(async move {
482                executor.timer(FLUSH_INTERVAL).await;
483                this.flush_events();
484            }));
485        }
486
487        let date_time = self.clock.utc_now();
488
489        let milliseconds_since_first_event = match state.first_event_date_time {
490            Some(first_event_date_time) => date_time
491                .saturating_duration_since(first_event_date_time)
492                .min(Duration::from_secs(60 * 60 * 24))
493                .as_millis() as i64,
494            None => {
495                state.first_event_date_time = Some(date_time);
496                0
497            }
498        };
499
500        let signed_in = state.metrics_id.is_some();
501        state.events_queue.push(EventWrapper {
502            signed_in,
503            milliseconds_since_first_event,
504            event,
505        });
506
507        if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
508            drop(state);
509            self.flush_events();
510        }
511    }
512
513    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
514        self.state.lock().metrics_id.clone()
515    }
516
517    pub fn system_id(self: &Arc<Self>) -> Option<Arc<str>> {
518        self.state.lock().system_id.clone()
519    }
520
521    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
522        self.state.lock().installation_id.clone()
523    }
524
525    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
526        self.state.lock().is_staff
527    }
528
529    fn build_request(
530        self: &Arc<Self>,
531        // We take in the JSON bytes buffer so we can reuse the existing allocation.
532        mut json_bytes: Vec<u8>,
533        event_request: EventRequestBody,
534    ) -> Result<Request<AsyncBody>> {
535        json_bytes.clear();
536        serde_json::to_writer(&mut json_bytes, &event_request)?;
537
538        let checksum = calculate_json_checksum(&json_bytes).unwrap_or("".to_string());
539
540        Ok(Request::builder()
541            .method(Method::POST)
542            .uri(
543                self.http_client
544                    .build_zed_api_url("/telemetry/events", &[])?
545                    .as_ref(),
546            )
547            .header("Content-Type", "application/json")
548            .header("x-zed-checksum", checksum)
549            .body(json_bytes.into())?)
550    }
551
552    pub fn flush_events(self: &Arc<Self>) {
553        let mut state = self.state.lock();
554        state.first_event_date_time = None;
555        let mut events = mem::take(&mut state.events_queue);
556        state.flush_events_task.take();
557        drop(state);
558        if events.is_empty() {
559            return;
560        }
561
562        let this = self.clone();
563        self.executor
564            .spawn(
565                async move {
566                    let mut json_bytes = Vec::new();
567
568                    if let Some(file) = &mut this.state.lock().log_file {
569                        for event in &mut events {
570                            json_bytes.clear();
571                            serde_json::to_writer(&mut json_bytes, event)?;
572                            file.write_all(&json_bytes)?;
573                            file.write_all(b"\n")?;
574                        }
575                    }
576
577                    let request_body = {
578                        let state = this.state.lock();
579
580                        EventRequestBody {
581                            system_id: state.system_id.as_deref().map(Into::into),
582                            installation_id: state.installation_id.as_deref().map(Into::into),
583                            session_id: state.session_id.clone(),
584                            metrics_id: state.metrics_id.as_deref().map(Into::into),
585                            is_staff: state.is_staff,
586                            app_version: state.app_version.clone(),
587                            os_name: state.os_name.clone(),
588                            os_version: state.os_version.clone(),
589                            architecture: state.architecture.to_string(),
590
591                            release_channel: state.release_channel.map(Into::into),
592                            events,
593                        }
594                    };
595
596                    let request = this.build_request(json_bytes, request_body)?;
597                    let response = this.http_client.send(request).await?;
598                    if response.status() != 200 {
599                        log::error!("Failed to send events: HTTP {:?}", response.status());
600                    }
601                    anyhow::Ok(())
602                }
603                .log_err(),
604            )
605            .detach();
606    }
607}
608
609pub fn calculate_json_checksum(json: &impl AsRef<[u8]>) -> Option<String> {
610    let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
611        return None;
612    };
613
614    let mut summer = Sha256::new();
615    summer.update(checksum_seed);
616    summer.update(json);
617    summer.update(checksum_seed);
618    let mut checksum = String::new();
619    for byte in summer.finalize().as_slice() {
620        use std::fmt::Write;
621        write!(&mut checksum, "{:02x}", byte).unwrap();
622    }
623
624    Some(checksum)
625}
626
627#[cfg(test)]
628mod tests {
629    use super::*;
630    use clock::FakeSystemClock;
631    use gpui::TestAppContext;
632    use http_client::FakeHttpClient;
633
634    #[gpui::test]
635    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
636        init_test(cx);
637        let clock = Arc::new(FakeSystemClock::new());
638        let http = FakeHttpClient::with_200_response();
639        let system_id = Some("system_id".to_string());
640        let installation_id = Some("installation_id".to_string());
641        let session_id = "session_id".to_string();
642
643        cx.update(|cx| {
644            let telemetry = Telemetry::new(clock.clone(), http, cx);
645
646            telemetry.state.lock().max_queue_size = 4;
647            telemetry.start(system_id, installation_id, session_id, cx);
648
649            assert!(is_empty_state(&telemetry));
650
651            let first_date_time = clock.utc_now();
652            let operation = "test".to_string();
653
654            let event = telemetry.report_app_event(operation.clone());
655            assert_eq!(
656                event,
657                Event::App(AppEvent {
658                    operation: operation.clone(),
659                })
660            );
661            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
662            assert!(telemetry.state.lock().flush_events_task.is_some());
663            assert_eq!(
664                telemetry.state.lock().first_event_date_time,
665                Some(first_date_time)
666            );
667
668            clock.advance(Duration::from_millis(100));
669
670            let event = telemetry.report_app_event(operation.clone());
671            assert_eq!(
672                event,
673                Event::App(AppEvent {
674                    operation: operation.clone(),
675                })
676            );
677            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
678            assert!(telemetry.state.lock().flush_events_task.is_some());
679            assert_eq!(
680                telemetry.state.lock().first_event_date_time,
681                Some(first_date_time)
682            );
683
684            clock.advance(Duration::from_millis(100));
685
686            let event = telemetry.report_app_event(operation.clone());
687            assert_eq!(
688                event,
689                Event::App(AppEvent {
690                    operation: operation.clone(),
691                })
692            );
693            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
694            assert!(telemetry.state.lock().flush_events_task.is_some());
695            assert_eq!(
696                telemetry.state.lock().first_event_date_time,
697                Some(first_date_time)
698            );
699
700            clock.advance(Duration::from_millis(100));
701
702            // Adding a 4th event should cause a flush
703            let event = telemetry.report_app_event(operation.clone());
704            assert_eq!(
705                event,
706                Event::App(AppEvent {
707                    operation: operation.clone(),
708                })
709            );
710
711            assert!(is_empty_state(&telemetry));
712        });
713    }
714
715    #[gpui::test]
716    async fn test_telemetry_flush_on_flush_interval(
717        executor: BackgroundExecutor,
718        cx: &mut TestAppContext,
719    ) {
720        init_test(cx);
721        let clock = Arc::new(FakeSystemClock::new());
722        let http = FakeHttpClient::with_200_response();
723        let system_id = Some("system_id".to_string());
724        let installation_id = Some("installation_id".to_string());
725        let session_id = "session_id".to_string();
726
727        cx.update(|cx| {
728            let telemetry = Telemetry::new(clock.clone(), http, cx);
729            telemetry.state.lock().max_queue_size = 4;
730            telemetry.start(system_id, installation_id, session_id, cx);
731
732            assert!(is_empty_state(&telemetry));
733
734            let first_date_time = clock.utc_now();
735            let operation = "test".to_string();
736
737            let event = telemetry.report_app_event(operation.clone());
738            assert_eq!(
739                event,
740                Event::App(AppEvent {
741                    operation: operation.clone(),
742                })
743            );
744            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
745            assert!(telemetry.state.lock().flush_events_task.is_some());
746            assert_eq!(
747                telemetry.state.lock().first_event_date_time,
748                Some(first_date_time)
749            );
750
751            let duration = Duration::from_millis(1);
752
753            // Test 1 millisecond before the flush interval limit is met
754            executor.advance_clock(FLUSH_INTERVAL - duration);
755
756            assert!(!is_empty_state(&telemetry));
757
758            // Test the exact moment the flush interval limit is met
759            executor.advance_clock(duration);
760
761            assert!(is_empty_state(&telemetry));
762        });
763    }
764
765    // TODO:
766    // Test settings
767    // Update FakeHTTPClient to keep track of the number of requests and assert on it
768
769    fn init_test(cx: &mut TestAppContext) {
770        cx.update(|cx| {
771            let settings_store = SettingsStore::test(cx);
772            cx.set_global(settings_store);
773        });
774    }
775
776    fn is_empty_state(telemetry: &Telemetry) -> bool {
777        telemetry.state.lock().events_queue.is_empty()
778            && telemetry.state.lock().flush_events_task.is_none()
779            && telemetry.state.lock().first_event_date_time.is_none()
780    }
781}