telemetry.rs

  1mod event_coalescer;
  2
  3use crate::{ChannelId, TelemetrySettings};
  4use chrono::{DateTime, Utc};
  5use clock::SystemClock;
  6use futures::Future;
  7use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
  8use http::{self, HttpClient, HttpClientWithUrl, Method};
  9use once_cell::sync::Lazy;
 10use parking_lot::Mutex;
 11use release_channel::ReleaseChannel;
 12use settings::{Settings, SettingsStore};
 13use sha2::{Digest, Sha256};
 14use std::io::Write;
 15use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 16use sysinfo::{CpuRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};
 17use telemetry_events::{
 18    ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CpuEvent, EditEvent,
 19    EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent,
 20    MemoryEvent, SettingEvent,
 21};
 22use tempfile::NamedTempFile;
 23#[cfg(not(debug_assertions))]
 24use util::ResultExt;
 25use util::TryFutureExt;
 26
 27use self::event_coalescer::EventCoalescer;
 28
 29pub struct Telemetry {
 30    clock: Arc<dyn SystemClock>,
 31    http_client: Arc<HttpClientWithUrl>,
 32    executor: BackgroundExecutor,
 33    state: Arc<Mutex<TelemetryState>>,
 34}
 35
 36struct TelemetryState {
 37    settings: TelemetrySettings,
 38    metrics_id: Option<Arc<str>>,      // Per logged-in user
 39    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 40    session_id: Option<String>,        // Per app launch
 41    release_channel: Option<&'static str>,
 42    app_metadata: AppMetadata,
 43    architecture: &'static str,
 44    events_queue: Vec<EventWrapper>,
 45    flush_events_task: Option<Task<()>>,
 46    log_file: Option<NamedTempFile>,
 47    is_staff: Option<bool>,
 48    first_event_date_time: Option<DateTime<Utc>>,
 49    event_coalescer: EventCoalescer,
 50    max_queue_size: usize,
 51}
 52
 53#[cfg(debug_assertions)]
 54const MAX_QUEUE_LEN: usize = 5;
 55
 56#[cfg(not(debug_assertions))]
 57const MAX_QUEUE_LEN: usize = 50;
 58
 59#[cfg(debug_assertions)]
 60const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
 61
 62#[cfg(not(debug_assertions))]
 63const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
 64static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
 65    option_env!("ZED_CLIENT_CHECKSUM_SEED")
 66        .map(|s| s.as_bytes().into())
 67        .or_else(|| {
 68            env::var("ZED_CLIENT_CHECKSUM_SEED")
 69                .ok()
 70                .map(|s| s.as_bytes().into())
 71        })
 72});
 73
 74impl Telemetry {
 75    pub fn new(
 76        clock: Arc<dyn SystemClock>,
 77        client: Arc<HttpClientWithUrl>,
 78        cx: &mut AppContext,
 79    ) -> Arc<Self> {
 80        let release_channel =
 81            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
 82
 83        TelemetrySettings::register(cx);
 84
 85        let state = Arc::new(Mutex::new(TelemetryState {
 86            settings: *TelemetrySettings::get_global(cx),
 87            app_metadata: cx.app_metadata(),
 88            architecture: env::consts::ARCH,
 89            release_channel,
 90            installation_id: None,
 91            metrics_id: None,
 92            session_id: None,
 93            events_queue: Vec::new(),
 94            flush_events_task: None,
 95            log_file: None,
 96            is_staff: None,
 97            first_event_date_time: None,
 98            event_coalescer: EventCoalescer::new(clock.clone()),
 99            max_queue_size: MAX_QUEUE_LEN,
100        }));
101
102        #[cfg(not(debug_assertions))]
103        cx.background_executor()
104            .spawn({
105                let state = state.clone();
106                async move {
107                    if let Some(tempfile) =
108                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
109                    {
110                        state.lock().log_file = Some(tempfile);
111                    }
112                }
113            })
114            .detach();
115
116        cx.observe_global::<SettingsStore>({
117            let state = state.clone();
118
119            move |cx| {
120                let mut state = state.lock();
121                state.settings = *TelemetrySettings::get_global(cx);
122            }
123        })
124        .detach();
125
126        // TODO: Replace all hardware stuff with nested SystemSpecs json
127        let this = Arc::new(Self {
128            clock,
129            http_client: client,
130            executor: cx.background_executor().clone(),
131            state,
132        });
133
134        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
135        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
136        std::mem::forget(cx.on_app_quit({
137            let this = this.clone();
138            move |_| this.shutdown_telemetry()
139        }));
140
141        this
142    }
143
144    #[cfg(any(test, feature = "test-support"))]
145    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
146        Task::ready(())
147    }
148
149    // Skip calling this function in tests.
150    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
151    #[cfg(not(any(test, feature = "test-support")))]
152    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
153        self.report_app_event("close".to_string());
154        // TODO: close final edit period and make sure it's sent
155        Task::ready(())
156    }
157
158    pub fn log_file_path(&self) -> Option<PathBuf> {
159        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
160    }
161
162    pub fn start(
163        self: &Arc<Self>,
164        installation_id: Option<String>,
165        session_id: String,
166        cx: &mut AppContext,
167    ) {
168        let mut state = self.state.lock();
169        state.installation_id = installation_id.map(|id| id.into());
170        state.session_id = Some(session_id);
171        drop(state);
172
173        let this = self.clone();
174        cx.background_executor()
175            .spawn(async move {
176                let mut system = System::new_with_specifics(
177                    RefreshKind::new().with_cpu(CpuRefreshKind::everything()),
178                );
179
180                let refresh_kind = ProcessRefreshKind::new().with_cpu().with_memory();
181                let current_process = Pid::from_u32(std::process::id());
182                system.refresh_process_specifics(current_process, refresh_kind);
183
184                // Waiting some amount of time before the first query is important to get a reasonable value
185                // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
186                const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
187
188                loop {
189                    smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
190
191                    let current_process = Pid::from_u32(std::process::id());
192                    system.refresh_process_specifics(current_process, refresh_kind);
193                    let Some(process) = system.process(current_process) else {
194                        log::error!(
195                            "Failed to find own process {current_process:?} in system process table"
196                        );
197                        // TODO: Fire an error telemetry event
198                        return;
199                    };
200
201                    this.report_memory_event(process.memory(), process.virtual_memory());
202                    this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
203                }
204            })
205            .detach();
206    }
207
208    pub fn set_authenticated_user_info(
209        self: &Arc<Self>,
210        metrics_id: Option<String>,
211        is_staff: bool,
212    ) {
213        let mut state = self.state.lock();
214
215        if !state.settings.metrics {
216            return;
217        }
218
219        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
220        state.metrics_id.clone_from(&metrics_id);
221        state.is_staff = Some(is_staff);
222        drop(state);
223    }
224
225    pub fn report_editor_event(
226        self: &Arc<Self>,
227        file_extension: Option<String>,
228        vim_mode: bool,
229        operation: &'static str,
230        copilot_enabled: bool,
231        copilot_enabled_for_language: bool,
232    ) {
233        let event = Event::Editor(EditorEvent {
234            file_extension,
235            vim_mode,
236            operation: operation.into(),
237            copilot_enabled,
238            copilot_enabled_for_language,
239        });
240
241        self.report_event(event)
242    }
243
244    pub fn report_inline_completion_event(
245        self: &Arc<Self>,
246        provider: String,
247        suggestion_accepted: bool,
248        file_extension: Option<String>,
249    ) {
250        let event = Event::InlineCompletion(InlineCompletionEvent {
251            provider,
252            suggestion_accepted,
253            file_extension,
254        });
255
256        self.report_event(event)
257    }
258
259    pub fn report_assistant_event(
260        self: &Arc<Self>,
261        conversation_id: Option<String>,
262        kind: AssistantKind,
263        model: String,
264        response_latency: Option<Duration>,
265        error_message: Option<String>,
266    ) {
267        let event = Event::Assistant(AssistantEvent {
268            conversation_id,
269            kind,
270            model: model.to_string(),
271            response_latency,
272            error_message,
273        });
274
275        self.report_event(event)
276    }
277
278    pub fn report_call_event(
279        self: &Arc<Self>,
280        operation: &'static str,
281        room_id: Option<u64>,
282        channel_id: Option<ChannelId>,
283    ) {
284        let event = Event::Call(CallEvent {
285            operation: operation.to_string(),
286            room_id,
287            channel_id: channel_id.map(|cid| cid.0),
288        });
289
290        self.report_event(event)
291    }
292
293    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
294        let event = Event::Cpu(CpuEvent {
295            usage_as_percentage,
296            core_count,
297        });
298
299        self.report_event(event)
300    }
301
302    pub fn report_memory_event(
303        self: &Arc<Self>,
304        memory_in_bytes: u64,
305        virtual_memory_in_bytes: u64,
306    ) {
307        let event = Event::Memory(MemoryEvent {
308            memory_in_bytes,
309            virtual_memory_in_bytes,
310        });
311
312        self.report_event(event)
313    }
314
315    pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
316        let event = Event::App(AppEvent { operation });
317
318        self.report_event(event.clone());
319
320        event
321    }
322
323    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
324        let event = Event::Setting(SettingEvent {
325            setting: setting.to_string(),
326            value,
327        });
328
329        self.report_event(event)
330    }
331
332    pub fn report_extension_event(self: &Arc<Self>, extension_id: Arc<str>, version: Arc<str>) {
333        self.report_event(Event::Extension(ExtensionEvent {
334            extension_id,
335            version,
336        }))
337    }
338
339    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
340        let mut state = self.state.lock();
341        let period_data = state.event_coalescer.log_event(environment);
342        drop(state);
343
344        if let Some((start, end, environment)) = period_data {
345            let event = Event::Edit(EditEvent {
346                duration: end.timestamp_millis() - start.timestamp_millis(),
347                environment: environment.to_string(),
348            });
349
350            self.report_event(event);
351        }
352    }
353
354    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
355        let event = Event::Action(ActionEvent {
356            source: source.to_string(),
357            action,
358        });
359
360        self.report_event(event)
361    }
362
363    fn report_event(self: &Arc<Self>, event: Event) {
364        let mut state = self.state.lock();
365
366        if !state.settings.metrics {
367            return;
368        }
369
370        if state.flush_events_task.is_none() {
371            let this = self.clone();
372            let executor = self.executor.clone();
373            state.flush_events_task = Some(self.executor.spawn(async move {
374                executor.timer(FLUSH_INTERVAL).await;
375                this.flush_events();
376            }));
377        }
378
379        let date_time = self.clock.utc_now();
380
381        let milliseconds_since_first_event = match state.first_event_date_time {
382            Some(first_event_date_time) => {
383                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
384            }
385            None => {
386                state.first_event_date_time = Some(date_time);
387                0
388            }
389        };
390
391        let signed_in = state.metrics_id.is_some();
392        state.events_queue.push(EventWrapper {
393            signed_in,
394            milliseconds_since_first_event,
395            event,
396        });
397
398        if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
399            drop(state);
400            self.flush_events();
401        }
402    }
403
404    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
405        self.state.lock().metrics_id.clone()
406    }
407
408    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
409        self.state.lock().installation_id.clone()
410    }
411
412    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
413        self.state.lock().is_staff
414    }
415
416    pub fn flush_events(self: &Arc<Self>) {
417        let mut state = self.state.lock();
418        state.first_event_date_time = None;
419        let mut events = mem::take(&mut state.events_queue);
420        state.flush_events_task.take();
421        drop(state);
422        if events.is_empty() {
423            return;
424        }
425
426        if ZED_CLIENT_CHECKSUM_SEED.is_none() {
427            return;
428        };
429
430        let this = self.clone();
431        self.executor
432            .spawn(
433                async move {
434                    let mut json_bytes = Vec::new();
435
436                    if let Some(file) = &mut this.state.lock().log_file {
437                        let file = file.as_file_mut();
438                        for event in &mut events {
439                            json_bytes.clear();
440                            serde_json::to_writer(&mut json_bytes, event)?;
441                            file.write_all(&json_bytes)?;
442                            file.write_all(b"\n")?;
443                        }
444                    }
445
446                    {
447                        let state = this.state.lock();
448                        let request_body = EventRequestBody {
449                            installation_id: state.installation_id.as_deref().map(Into::into),
450                            session_id: state.session_id.clone(),
451                            is_staff: state.is_staff,
452                            app_version: state
453                                .app_metadata
454                                .app_version
455                                .unwrap_or_default()
456                                .to_string(),
457                            os_name: state.app_metadata.os_name.to_string(),
458                            os_version: state
459                                .app_metadata
460                                .os_version
461                                .map(|version| version.to_string()),
462                            architecture: state.architecture.to_string(),
463
464                            release_channel: state.release_channel.map(Into::into),
465                            events,
466                        };
467                        json_bytes.clear();
468                        serde_json::to_writer(&mut json_bytes, &request_body)?;
469                    }
470
471                    let Some(checksum) = calculate_json_checksum(&json_bytes) else {
472                        return Ok(());
473                    };
474
475                    let request = http::Request::builder()
476                        .method(Method::POST)
477                        .uri(
478                            this.http_client
479                                .build_zed_api_url("/telemetry/events", &[])?
480                                .as_ref(),
481                        )
482                        .header("Content-Type", "text/plain")
483                        .header("x-zed-checksum", checksum)
484                        .body(json_bytes.into());
485
486                    let response = this.http_client.send(request?).await?;
487                    if response.status() != 200 {
488                        log::error!("Failed to send events: HTTP {:?}", response.status());
489                    }
490                    anyhow::Ok(())
491                }
492                .log_err(),
493            )
494            .detach();
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use chrono::TimeZone;
502    use clock::FakeSystemClock;
503    use gpui::TestAppContext;
504    use http::FakeHttpClient;
505
506    #[gpui::test]
507    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
508        init_test(cx);
509        let clock = Arc::new(FakeSystemClock::new(
510            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
511        ));
512        let http = FakeHttpClient::with_200_response();
513        let installation_id = Some("installation_id".to_string());
514        let session_id = "session_id".to_string();
515
516        cx.update(|cx| {
517            let telemetry = Telemetry::new(clock.clone(), http, cx);
518
519            telemetry.state.lock().max_queue_size = 4;
520            telemetry.start(installation_id, session_id, cx);
521
522            assert!(is_empty_state(&telemetry));
523
524            let first_date_time = clock.utc_now();
525            let operation = "test".to_string();
526
527            let event = telemetry.report_app_event(operation.clone());
528            assert_eq!(
529                event,
530                Event::App(AppEvent {
531                    operation: operation.clone(),
532                })
533            );
534            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
535            assert!(telemetry.state.lock().flush_events_task.is_some());
536            assert_eq!(
537                telemetry.state.lock().first_event_date_time,
538                Some(first_date_time)
539            );
540
541            clock.advance(chrono::Duration::milliseconds(100));
542
543            let event = telemetry.report_app_event(operation.clone());
544            assert_eq!(
545                event,
546                Event::App(AppEvent {
547                    operation: operation.clone(),
548                })
549            );
550            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
551            assert!(telemetry.state.lock().flush_events_task.is_some());
552            assert_eq!(
553                telemetry.state.lock().first_event_date_time,
554                Some(first_date_time)
555            );
556
557            clock.advance(chrono::Duration::milliseconds(100));
558
559            let event = telemetry.report_app_event(operation.clone());
560            assert_eq!(
561                event,
562                Event::App(AppEvent {
563                    operation: operation.clone(),
564                })
565            );
566            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
567            assert!(telemetry.state.lock().flush_events_task.is_some());
568            assert_eq!(
569                telemetry.state.lock().first_event_date_time,
570                Some(first_date_time)
571            );
572
573            clock.advance(chrono::Duration::milliseconds(100));
574
575            // Adding a 4th event should cause a flush
576            let event = telemetry.report_app_event(operation.clone());
577            assert_eq!(
578                event,
579                Event::App(AppEvent {
580                    operation: operation.clone(),
581                })
582            );
583
584            assert!(is_empty_state(&telemetry));
585        });
586    }
587
588    #[gpui::test]
589    async fn test_telemetry_flush_on_flush_interval(
590        executor: BackgroundExecutor,
591        cx: &mut TestAppContext,
592    ) {
593        init_test(cx);
594        let clock = Arc::new(FakeSystemClock::new(
595            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
596        ));
597        let http = FakeHttpClient::with_200_response();
598        let installation_id = Some("installation_id".to_string());
599        let session_id = "session_id".to_string();
600
601        cx.update(|cx| {
602            let telemetry = Telemetry::new(clock.clone(), http, cx);
603            telemetry.state.lock().max_queue_size = 4;
604            telemetry.start(installation_id, session_id, cx);
605
606            assert!(is_empty_state(&telemetry));
607
608            let first_date_time = clock.utc_now();
609            let operation = "test".to_string();
610
611            let event = telemetry.report_app_event(operation.clone());
612            assert_eq!(
613                event,
614                Event::App(AppEvent {
615                    operation: operation.clone(),
616                })
617            );
618            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
619            assert!(telemetry.state.lock().flush_events_task.is_some());
620            assert_eq!(
621                telemetry.state.lock().first_event_date_time,
622                Some(first_date_time)
623            );
624
625            let duration = Duration::from_millis(1);
626
627            // Test 1 millisecond before the flush interval limit is met
628            executor.advance_clock(FLUSH_INTERVAL - duration);
629
630            assert!(!is_empty_state(&telemetry));
631
632            // Test the exact moment the flush interval limit is met
633            executor.advance_clock(duration);
634
635            assert!(is_empty_state(&telemetry));
636        });
637    }
638
639    // TODO:
640    // Test settings
641    // Update FakeHTTPClient to keep track of the number of requests and assert on it
642
643    fn init_test(cx: &mut TestAppContext) {
644        cx.update(|cx| {
645            let settings_store = SettingsStore::test(cx);
646            cx.set_global(settings_store);
647        });
648    }
649
650    fn is_empty_state(telemetry: &Telemetry) -> bool {
651        telemetry.state.lock().events_queue.is_empty()
652            && telemetry.state.lock().flush_events_task.is_none()
653            && telemetry.state.lock().first_event_date_time.is_none()
654    }
655}
656
657pub fn calculate_json_checksum(json: &impl AsRef<[u8]>) -> Option<String> {
658    let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
659        return None;
660    };
661
662    let mut summer = Sha256::new();
663    summer.update(checksum_seed);
664    summer.update(&json);
665    summer.update(checksum_seed);
666    let mut checksum = String::new();
667    for byte in summer.finalize().as_slice() {
668        use std::fmt::Write;
669        write!(&mut checksum, "{:02x}", byte).unwrap();
670    }
671
672    Some(checksum)
673}