telemetry.rs

  1mod event_coalescer;
  2
  3use crate::{ChannelId, TelemetrySettings};
  4use chrono::{DateTime, Utc};
  5use clock::SystemClock;
  6use futures::Future;
  7use gpui::{AppContext, BackgroundExecutor, Task};
  8use http::{self, HttpClient, HttpClientWithUrl, Method};
  9use once_cell::sync::Lazy;
 10use parking_lot::Mutex;
 11use release_channel::ReleaseChannel;
 12use settings::{Settings, SettingsStore};
 13use sha2::{Digest, Sha256};
 14use std::io::Write;
 15use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 16use sysinfo::{CpuRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};
 17use telemetry_events::{
 18    ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CpuEvent, EditEvent,
 19    EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent,
 20    MemoryEvent, SettingEvent,
 21};
 22use tempfile::NamedTempFile;
 23#[cfg(not(debug_assertions))]
 24use util::ResultExt;
 25use util::TryFutureExt;
 26
 27use self::event_coalescer::EventCoalescer;
 28
 29pub struct Telemetry {
 30    clock: Arc<dyn SystemClock>,
 31    http_client: Arc<HttpClientWithUrl>,
 32    executor: BackgroundExecutor,
 33    state: Arc<Mutex<TelemetryState>>,
 34}
 35
 36struct TelemetryState {
 37    settings: TelemetrySettings,
 38    metrics_id: Option<Arc<str>>,      // Per logged-in user
 39    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 40    session_id: Option<String>,        // Per app launch
 41    release_channel: Option<&'static str>,
 42    architecture: &'static str,
 43    events_queue: Vec<EventWrapper>,
 44    flush_events_task: Option<Task<()>>,
 45    log_file: Option<NamedTempFile>,
 46    is_staff: Option<bool>,
 47    first_event_date_time: Option<DateTime<Utc>>,
 48    event_coalescer: EventCoalescer,
 49    max_queue_size: usize,
 50
 51    os_name: String,
 52    app_version: String,
 53    os_version: Option<String>,
 54}
 55
 56#[cfg(debug_assertions)]
 57const MAX_QUEUE_LEN: usize = 5;
 58
 59#[cfg(not(debug_assertions))]
 60const MAX_QUEUE_LEN: usize = 50;
 61
 62#[cfg(debug_assertions)]
 63const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
 64
 65#[cfg(not(debug_assertions))]
 66const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
 67static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
 68    option_env!("ZED_CLIENT_CHECKSUM_SEED")
 69        .map(|s| s.as_bytes().into())
 70        .or_else(|| {
 71            env::var("ZED_CLIENT_CHECKSUM_SEED")
 72                .ok()
 73                .map(|s| s.as_bytes().into())
 74        })
 75});
 76
 77pub fn os_name() -> String {
 78    #[cfg(target_os = "macos")]
 79    {
 80        "macOS".to_string()
 81    }
 82    #[cfg(target_os = "linux")]
 83    {
 84        format!("Linux {}", gpui::guess_compositor())
 85    }
 86
 87    #[cfg(target_os = "windows")]
 88    {
 89        "Windows".to_string()
 90    }
 91}
 92
 93/// Note: This might do blocking IO! Only call from background threads
 94pub fn os_version() -> String {
 95    #[cfg(target_os = "macos")]
 96    {
 97        use cocoa::base::nil;
 98        use cocoa::foundation::NSProcessInfo;
 99
100        unsafe {
101            let process_info = cocoa::foundation::NSProcessInfo::processInfo(nil);
102            let version = process_info.operatingSystemVersion();
103            gpui::SemanticVersion::new(
104                version.majorVersion as usize,
105                version.minorVersion as usize,
106                version.patchVersion as usize,
107            )
108            .to_string()
109        }
110    }
111    #[cfg(target_os = "linux")]
112    {
113        use std::path::Path;
114
115        let content = if let Ok(file) = std::fs::read_to_string(&Path::new("/etc/os-release")) {
116            file
117        } else if let Ok(file) = std::fs::read_to_string(&Path::new("/usr/lib/os-release")) {
118            file
119        } else {
120            log::error!("Failed to load /etc/os-release, /usr/lib/os-release");
121            "".to_string()
122        };
123        let mut name = "unknown".to_string();
124        let mut version = "unknown".to_string();
125
126        for line in content.lines() {
127            if line.starts_with("ID=") {
128                name = line.trim_start_matches("ID=").trim_matches('"').to_string();
129            }
130            if line.starts_with("VERSION_ID=") {
131                version = line
132                    .trim_start_matches("VERSION_ID=")
133                    .trim_matches('"')
134                    .to_string();
135            }
136        }
137
138        format!("{} {}", name, version)
139    }
140
141    #[cfg(target_os = "windows")]
142    {
143        let mut info = unsafe { std::mem::zeroed() };
144        let status = unsafe { windows::Wdk::System::SystemServices::RtlGetVersion(&mut info) };
145        if status.is_ok() {
146            gpui::SemanticVersion::new(
147                info.dwMajorVersion as _,
148                info.dwMinorVersion as _,
149                info.dwBuildNumber as _,
150            )
151            .to_string()
152        } else {
153            "unknown".to_string()
154        }
155    }
156}
157
158impl Telemetry {
159    pub fn new(
160        clock: Arc<dyn SystemClock>,
161        client: Arc<HttpClientWithUrl>,
162        cx: &mut AppContext,
163    ) -> Arc<Self> {
164        let release_channel =
165            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
166
167        TelemetrySettings::register(cx);
168
169        let state = Arc::new(Mutex::new(TelemetryState {
170            settings: *TelemetrySettings::get_global(cx),
171            architecture: env::consts::ARCH,
172            release_channel,
173            installation_id: None,
174            metrics_id: None,
175            session_id: None,
176            events_queue: Vec::new(),
177            flush_events_task: None,
178            log_file: None,
179            is_staff: None,
180            first_event_date_time: None,
181            event_coalescer: EventCoalescer::new(clock.clone()),
182            max_queue_size: MAX_QUEUE_LEN,
183
184            os_version: None,
185            os_name: os_name(),
186            app_version: release_channel::AppVersion::global(cx).to_string(),
187        }));
188
189        #[cfg(not(debug_assertions))]
190        cx.background_executor()
191            .spawn({
192                let state = state.clone();
193                async move {
194                    if let Some(tempfile) =
195                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
196                    {
197                        state.lock().log_file = Some(tempfile);
198                    }
199                }
200            })
201            .detach();
202
203        cx.observe_global::<SettingsStore>({
204            let state = state.clone();
205
206            move |cx| {
207                let mut state = state.lock();
208                state.settings = *TelemetrySettings::get_global(cx);
209            }
210        })
211        .detach();
212
213        // TODO: Replace all hardware stuff with nested SystemSpecs json
214        let this = Arc::new(Self {
215            clock,
216            http_client: client,
217            executor: cx.background_executor().clone(),
218            state,
219        });
220
221        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
222        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
223        std::mem::forget(cx.on_app_quit({
224            let this = this.clone();
225            move |_| this.shutdown_telemetry()
226        }));
227
228        this
229    }
230
231    #[cfg(any(test, feature = "test-support"))]
232    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
233        Task::ready(())
234    }
235
236    // Skip calling this function in tests.
237    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
238    #[cfg(not(any(test, feature = "test-support")))]
239    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
240        self.report_app_event("close".to_string());
241        // TODO: close final edit period and make sure it's sent
242        Task::ready(())
243    }
244
245    pub fn log_file_path(&self) -> Option<PathBuf> {
246        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
247    }
248
249    pub fn start(
250        self: &Arc<Self>,
251        installation_id: Option<String>,
252        session_id: String,
253        cx: &mut AppContext,
254    ) {
255        let mut state = self.state.lock();
256        state.installation_id = installation_id.map(|id| id.into());
257        state.session_id = Some(session_id);
258        state.app_version = release_channel::AppVersion::global(cx).to_string();
259        state.os_name = os_version();
260
261        drop(state);
262
263        let this = self.clone();
264        cx.background_executor()
265            .spawn(async move {
266                let mut system = System::new_with_specifics(
267                    RefreshKind::new().with_cpu(CpuRefreshKind::everything()),
268                );
269
270                let refresh_kind = ProcessRefreshKind::new().with_cpu().with_memory();
271                let current_process = Pid::from_u32(std::process::id());
272                system.refresh_process_specifics(current_process, refresh_kind);
273
274                // Waiting some amount of time before the first query is important to get a reasonable value
275                // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
276                const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
277
278                loop {
279                    smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
280
281                    let current_process = Pid::from_u32(std::process::id());
282                    system.refresh_process_specifics(current_process, refresh_kind);
283                    let Some(process) = system.process(current_process) else {
284                        log::error!(
285                            "Failed to find own process {current_process:?} in system process table"
286                        );
287                        // TODO: Fire an error telemetry event
288                        return;
289                    };
290
291                    this.report_memory_event(process.memory(), process.virtual_memory());
292                    this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
293                }
294            })
295            .detach();
296    }
297
298    pub fn set_authenticated_user_info(
299        self: &Arc<Self>,
300        metrics_id: Option<String>,
301        is_staff: bool,
302    ) {
303        let mut state = self.state.lock();
304
305        if !state.settings.metrics {
306            return;
307        }
308
309        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
310        state.metrics_id.clone_from(&metrics_id);
311        state.is_staff = Some(is_staff);
312        drop(state);
313    }
314
315    pub fn report_editor_event(
316        self: &Arc<Self>,
317        file_extension: Option<String>,
318        vim_mode: bool,
319        operation: &'static str,
320        copilot_enabled: bool,
321        copilot_enabled_for_language: bool,
322    ) {
323        let event = Event::Editor(EditorEvent {
324            file_extension,
325            vim_mode,
326            operation: operation.into(),
327            copilot_enabled,
328            copilot_enabled_for_language,
329        });
330
331        self.report_event(event)
332    }
333
334    pub fn report_inline_completion_event(
335        self: &Arc<Self>,
336        provider: String,
337        suggestion_accepted: bool,
338        file_extension: Option<String>,
339    ) {
340        let event = Event::InlineCompletion(InlineCompletionEvent {
341            provider,
342            suggestion_accepted,
343            file_extension,
344        });
345
346        self.report_event(event)
347    }
348
349    pub fn report_assistant_event(
350        self: &Arc<Self>,
351        conversation_id: Option<String>,
352        kind: AssistantKind,
353        model: String,
354        response_latency: Option<Duration>,
355        error_message: Option<String>,
356    ) {
357        let event = Event::Assistant(AssistantEvent {
358            conversation_id,
359            kind,
360            model: model.to_string(),
361            response_latency,
362            error_message,
363        });
364
365        self.report_event(event)
366    }
367
368    pub fn report_call_event(
369        self: &Arc<Self>,
370        operation: &'static str,
371        room_id: Option<u64>,
372        channel_id: Option<ChannelId>,
373    ) {
374        let event = Event::Call(CallEvent {
375            operation: operation.to_string(),
376            room_id,
377            channel_id: channel_id.map(|cid| cid.0),
378        });
379
380        self.report_event(event)
381    }
382
383    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
384        let event = Event::Cpu(CpuEvent {
385            usage_as_percentage,
386            core_count,
387        });
388
389        self.report_event(event)
390    }
391
392    pub fn report_memory_event(
393        self: &Arc<Self>,
394        memory_in_bytes: u64,
395        virtual_memory_in_bytes: u64,
396    ) {
397        let event = Event::Memory(MemoryEvent {
398            memory_in_bytes,
399            virtual_memory_in_bytes,
400        });
401
402        self.report_event(event)
403    }
404
405    pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
406        let event = Event::App(AppEvent { operation });
407
408        self.report_event(event.clone());
409
410        event
411    }
412
413    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
414        let event = Event::Setting(SettingEvent {
415            setting: setting.to_string(),
416            value,
417        });
418
419        self.report_event(event)
420    }
421
422    pub fn report_extension_event(self: &Arc<Self>, extension_id: Arc<str>, version: Arc<str>) {
423        self.report_event(Event::Extension(ExtensionEvent {
424            extension_id,
425            version,
426        }))
427    }
428
429    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
430        let mut state = self.state.lock();
431        let period_data = state.event_coalescer.log_event(environment);
432        drop(state);
433
434        if let Some((start, end, environment)) = period_data {
435            let event = Event::Edit(EditEvent {
436                duration: end.timestamp_millis() - start.timestamp_millis(),
437                environment: environment.to_string(),
438            });
439
440            self.report_event(event);
441        }
442    }
443
444    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
445        let event = Event::Action(ActionEvent {
446            source: source.to_string(),
447            action,
448        });
449
450        self.report_event(event)
451    }
452
453    fn report_event(self: &Arc<Self>, event: Event) {
454        let mut state = self.state.lock();
455
456        if !state.settings.metrics {
457            return;
458        }
459
460        if state.flush_events_task.is_none() {
461            let this = self.clone();
462            let executor = self.executor.clone();
463            state.flush_events_task = Some(self.executor.spawn(async move {
464                executor.timer(FLUSH_INTERVAL).await;
465                this.flush_events();
466            }));
467        }
468
469        let date_time = self.clock.utc_now();
470
471        let milliseconds_since_first_event = match state.first_event_date_time {
472            Some(first_event_date_time) => {
473                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
474            }
475            None => {
476                state.first_event_date_time = Some(date_time);
477                0
478            }
479        };
480
481        let signed_in = state.metrics_id.is_some();
482        state.events_queue.push(EventWrapper {
483            signed_in,
484            milliseconds_since_first_event,
485            event,
486        });
487
488        if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
489            drop(state);
490            self.flush_events();
491        }
492    }
493
494    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
495        self.state.lock().metrics_id.clone()
496    }
497
498    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
499        self.state.lock().installation_id.clone()
500    }
501
502    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
503        self.state.lock().is_staff
504    }
505
506    pub fn flush_events(self: &Arc<Self>) {
507        let mut state = self.state.lock();
508        state.first_event_date_time = None;
509        let mut events = mem::take(&mut state.events_queue);
510        state.flush_events_task.take();
511        drop(state);
512        if events.is_empty() {
513            return;
514        }
515
516        if ZED_CLIENT_CHECKSUM_SEED.is_none() {
517            return;
518        };
519
520        let this = self.clone();
521        self.executor
522            .spawn(
523                async move {
524                    let mut json_bytes = Vec::new();
525
526                    if let Some(file) = &mut this.state.lock().log_file {
527                        let file = file.as_file_mut();
528                        for event in &mut events {
529                            json_bytes.clear();
530                            serde_json::to_writer(&mut json_bytes, event)?;
531                            file.write_all(&json_bytes)?;
532                            file.write_all(b"\n")?;
533                        }
534                    }
535
536                    {
537                        let state = this.state.lock();
538
539                        let request_body = EventRequestBody {
540                            installation_id: state.installation_id.as_deref().map(Into::into),
541                            session_id: state.session_id.clone(),
542                            is_staff: state.is_staff,
543                            app_version: state.app_version.clone(),
544                            os_name: state.os_name.clone(),
545                            os_version: state.os_version.clone(),
546                            architecture: state.architecture.to_string(),
547
548                            release_channel: state.release_channel.map(Into::into),
549                            events,
550                        };
551                        json_bytes.clear();
552                        serde_json::to_writer(&mut json_bytes, &request_body)?;
553                    }
554
555                    let Some(checksum) = calculate_json_checksum(&json_bytes) else {
556                        return Ok(());
557                    };
558
559                    let request = http::Request::builder()
560                        .method(Method::POST)
561                        .uri(
562                            this.http_client
563                                .build_zed_api_url("/telemetry/events", &[])?
564                                .as_ref(),
565                        )
566                        .header("Content-Type", "text/plain")
567                        .header("x-zed-checksum", checksum)
568                        .body(json_bytes.into());
569
570                    let response = this.http_client.send(request?).await?;
571                    if response.status() != 200 {
572                        log::error!("Failed to send events: HTTP {:?}", response.status());
573                    }
574                    anyhow::Ok(())
575                }
576                .log_err(),
577            )
578            .detach();
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use chrono::TimeZone;
586    use clock::FakeSystemClock;
587    use gpui::TestAppContext;
588    use http::FakeHttpClient;
589
590    #[gpui::test]
591    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
592        init_test(cx);
593        let clock = Arc::new(FakeSystemClock::new(
594            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
595        ));
596        let http = FakeHttpClient::with_200_response();
597        let installation_id = Some("installation_id".to_string());
598        let session_id = "session_id".to_string();
599
600        cx.update(|cx| {
601            let telemetry = Telemetry::new(clock.clone(), http, cx);
602
603            telemetry.state.lock().max_queue_size = 4;
604            telemetry.start(installation_id, session_id, cx);
605
606            assert!(is_empty_state(&telemetry));
607
608            let first_date_time = clock.utc_now();
609            let operation = "test".to_string();
610
611            let event = telemetry.report_app_event(operation.clone());
612            assert_eq!(
613                event,
614                Event::App(AppEvent {
615                    operation: operation.clone(),
616                })
617            );
618            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
619            assert!(telemetry.state.lock().flush_events_task.is_some());
620            assert_eq!(
621                telemetry.state.lock().first_event_date_time,
622                Some(first_date_time)
623            );
624
625            clock.advance(chrono::Duration::milliseconds(100));
626
627            let event = telemetry.report_app_event(operation.clone());
628            assert_eq!(
629                event,
630                Event::App(AppEvent {
631                    operation: operation.clone(),
632                })
633            );
634            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
635            assert!(telemetry.state.lock().flush_events_task.is_some());
636            assert_eq!(
637                telemetry.state.lock().first_event_date_time,
638                Some(first_date_time)
639            );
640
641            clock.advance(chrono::Duration::milliseconds(100));
642
643            let event = telemetry.report_app_event(operation.clone());
644            assert_eq!(
645                event,
646                Event::App(AppEvent {
647                    operation: operation.clone(),
648                })
649            );
650            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
651            assert!(telemetry.state.lock().flush_events_task.is_some());
652            assert_eq!(
653                telemetry.state.lock().first_event_date_time,
654                Some(first_date_time)
655            );
656
657            clock.advance(chrono::Duration::milliseconds(100));
658
659            // Adding a 4th event should cause a flush
660            let event = telemetry.report_app_event(operation.clone());
661            assert_eq!(
662                event,
663                Event::App(AppEvent {
664                    operation: operation.clone(),
665                })
666            );
667
668            assert!(is_empty_state(&telemetry));
669        });
670    }
671
672    #[gpui::test]
673    async fn test_telemetry_flush_on_flush_interval(
674        executor: BackgroundExecutor,
675        cx: &mut TestAppContext,
676    ) {
677        init_test(cx);
678        let clock = Arc::new(FakeSystemClock::new(
679            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
680        ));
681        let http = FakeHttpClient::with_200_response();
682        let installation_id = Some("installation_id".to_string());
683        let session_id = "session_id".to_string();
684
685        cx.update(|cx| {
686            let telemetry = Telemetry::new(clock.clone(), http, cx);
687            telemetry.state.lock().max_queue_size = 4;
688            telemetry.start(installation_id, session_id, cx);
689
690            assert!(is_empty_state(&telemetry));
691
692            let first_date_time = clock.utc_now();
693            let operation = "test".to_string();
694
695            let event = telemetry.report_app_event(operation.clone());
696            assert_eq!(
697                event,
698                Event::App(AppEvent {
699                    operation: operation.clone(),
700                })
701            );
702            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
703            assert!(telemetry.state.lock().flush_events_task.is_some());
704            assert_eq!(
705                telemetry.state.lock().first_event_date_time,
706                Some(first_date_time)
707            );
708
709            let duration = Duration::from_millis(1);
710
711            // Test 1 millisecond before the flush interval limit is met
712            executor.advance_clock(FLUSH_INTERVAL - duration);
713
714            assert!(!is_empty_state(&telemetry));
715
716            // Test the exact moment the flush interval limit is met
717            executor.advance_clock(duration);
718
719            assert!(is_empty_state(&telemetry));
720        });
721    }
722
723    // TODO:
724    // Test settings
725    // Update FakeHTTPClient to keep track of the number of requests and assert on it
726
727    fn init_test(cx: &mut TestAppContext) {
728        cx.update(|cx| {
729            let settings_store = SettingsStore::test(cx);
730            cx.set_global(settings_store);
731        });
732    }
733
734    fn is_empty_state(telemetry: &Telemetry) -> bool {
735        telemetry.state.lock().events_queue.is_empty()
736            && telemetry.state.lock().flush_events_task.is_none()
737            && telemetry.state.lock().first_event_date_time.is_none()
738    }
739}
740
741pub fn calculate_json_checksum(json: &impl AsRef<[u8]>) -> Option<String> {
742    let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
743        return None;
744    };
745
746    let mut summer = Sha256::new();
747    summer.update(checksum_seed);
748    summer.update(&json);
749    summer.update(checksum_seed);
750    let mut checksum = String::new();
751    for byte in summer.finalize().as_slice() {
752        use std::fmt::Write;
753        write!(&mut checksum, "{:02x}", byte).unwrap();
754    }
755
756    Some(checksum)
757}