telemetry.rs

  1mod event_coalescer;
  2
  3use crate::{ChannelId, TelemetrySettings};
  4use chrono::{DateTime, Utc};
  5use clock::SystemClock;
  6use futures::Future;
  7use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
  8use once_cell::sync::Lazy;
  9use parking_lot::Mutex;
 10use release_channel::ReleaseChannel;
 11use settings::{Settings, SettingsStore};
 12use sha2::{Digest, Sha256};
 13use std::io::Write;
 14use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 15use sysinfo::{CpuRefreshKind, MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};
 16use telemetry_events::{
 17    ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CopilotEvent, CpuEvent,
 18    EditEvent, EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, MemoryEvent,
 19    SettingEvent,
 20};
 21use tempfile::NamedTempFile;
 22use util::http::{self, HttpClient, HttpClientWithUrl, Method};
 23#[cfg(not(debug_assertions))]
 24use util::ResultExt;
 25use util::TryFutureExt;
 26
 27use self::event_coalescer::EventCoalescer;
 28
 29pub struct Telemetry {
 30    clock: Arc<dyn SystemClock>,
 31    http_client: Arc<HttpClientWithUrl>,
 32    executor: BackgroundExecutor,
 33    state: Arc<Mutex<TelemetryState>>,
 34}
 35
 36struct TelemetryState {
 37    settings: TelemetrySettings,
 38    metrics_id: Option<Arc<str>>,      // Per logged-in user
 39    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 40    session_id: Option<String>,        // Per app launch
 41    release_channel: Option<&'static str>,
 42    app_metadata: AppMetadata,
 43    architecture: &'static str,
 44    events_queue: Vec<EventWrapper>,
 45    flush_events_task: Option<Task<()>>,
 46    log_file: Option<NamedTempFile>,
 47    is_staff: Option<bool>,
 48    first_event_date_time: Option<DateTime<Utc>>,
 49    event_coalescer: EventCoalescer,
 50    max_queue_size: usize,
 51}
 52
 53#[cfg(debug_assertions)]
 54const MAX_QUEUE_LEN: usize = 5;
 55
 56#[cfg(not(debug_assertions))]
 57const MAX_QUEUE_LEN: usize = 50;
 58
 59#[cfg(debug_assertions)]
 60const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
 61
 62#[cfg(not(debug_assertions))]
 63const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
 64static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
 65    option_env!("ZED_CLIENT_CHECKSUM_SEED")
 66        .map(|s| s.as_bytes().into())
 67        .or_else(|| {
 68            env::var("ZED_CLIENT_CHECKSUM_SEED")
 69                .ok()
 70                .map(|s| s.as_bytes().into())
 71        })
 72});
 73
 74impl Telemetry {
 75    pub fn new(
 76        clock: Arc<dyn SystemClock>,
 77        client: Arc<HttpClientWithUrl>,
 78        cx: &mut AppContext,
 79    ) -> Arc<Self> {
 80        let release_channel =
 81            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
 82
 83        TelemetrySettings::register(cx);
 84
 85        let state = Arc::new(Mutex::new(TelemetryState {
 86            settings: *TelemetrySettings::get_global(cx),
 87            app_metadata: cx.app_metadata(),
 88            architecture: env::consts::ARCH,
 89            release_channel,
 90            installation_id: None,
 91            metrics_id: None,
 92            session_id: None,
 93            events_queue: Vec::new(),
 94            flush_events_task: None,
 95            log_file: None,
 96            is_staff: None,
 97            first_event_date_time: None,
 98            event_coalescer: EventCoalescer::new(clock.clone()),
 99            max_queue_size: MAX_QUEUE_LEN,
100        }));
101
102        #[cfg(not(debug_assertions))]
103        cx.background_executor()
104            .spawn({
105                let state = state.clone();
106                async move {
107                    if let Some(tempfile) =
108                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
109                    {
110                        state.lock().log_file = Some(tempfile);
111                    }
112                }
113            })
114            .detach();
115
116        cx.observe_global::<SettingsStore>({
117            let state = state.clone();
118
119            move |cx| {
120                let mut state = state.lock();
121                state.settings = *TelemetrySettings::get_global(cx);
122            }
123        })
124        .detach();
125
126        // TODO: Replace all hardware stuff with nested SystemSpecs json
127        let this = Arc::new(Self {
128            clock,
129            http_client: client,
130            executor: cx.background_executor().clone(),
131            state,
132        });
133
134        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
135        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
136        std::mem::forget(cx.on_app_quit({
137            let this = this.clone();
138            move |_| this.shutdown_telemetry()
139        }));
140
141        this
142    }
143
144    #[cfg(any(test, feature = "test-support"))]
145    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
146        Task::ready(())
147    }
148
149    // Skip calling this function in tests.
150    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
151    #[cfg(not(any(test, feature = "test-support")))]
152    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
153        self.report_app_event("close".to_string());
154        // TODO: close final edit period and make sure it's sent
155        Task::ready(())
156    }
157
158    pub fn log_file_path(&self) -> Option<PathBuf> {
159        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
160    }
161
162    pub fn start(
163        self: &Arc<Self>,
164        installation_id: Option<String>,
165        session_id: String,
166        cx: &mut AppContext,
167    ) {
168        let mut state = self.state.lock();
169        state.installation_id = installation_id.map(|id| id.into());
170        state.session_id = Some(session_id);
171        drop(state);
172
173        let this = self.clone();
174        cx.spawn(|_| async move {
175            // Avoiding calling `System::new_all()`, as there have been crashes related to it
176            let refresh_kind = RefreshKind::new()
177                .with_memory(MemoryRefreshKind::everything()) // For memory usage
178                .with_processes(ProcessRefreshKind::everything()) // For process usage
179                .with_cpu(CpuRefreshKind::everything()); // For core count
180
181            let mut system = System::new_with_specifics(refresh_kind);
182
183            // Avoiding calling `refresh_all()`, just update what we need
184            system.refresh_specifics(refresh_kind);
185
186            // Waiting some amount of time before the first query is important to get a reasonable value
187            // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
188            const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
189
190            loop {
191                smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
192
193                system.refresh_specifics(refresh_kind);
194
195                let current_process = Pid::from_u32(std::process::id());
196                let Some(process) = system.processes().get(&current_process) else {
197                    let process = current_process;
198                    log::error!("Failed to find own process {process:?} in system process table");
199                    // TODO: Fire an error telemetry event
200                    return;
201                };
202
203                this.report_memory_event(process.memory(), process.virtual_memory());
204                this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
205            }
206        })
207        .detach();
208    }
209
210    pub fn set_authenticated_user_info(
211        self: &Arc<Self>,
212        metrics_id: Option<String>,
213        is_staff: bool,
214    ) {
215        let mut state = self.state.lock();
216
217        if !state.settings.metrics {
218            return;
219        }
220
221        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
222        state.metrics_id = metrics_id.clone();
223        state.is_staff = Some(is_staff);
224        drop(state);
225    }
226
227    pub fn report_editor_event(
228        self: &Arc<Self>,
229        file_extension: Option<String>,
230        vim_mode: bool,
231        operation: &'static str,
232        copilot_enabled: bool,
233        copilot_enabled_for_language: bool,
234    ) {
235        let event = Event::Editor(EditorEvent {
236            file_extension,
237            vim_mode,
238            operation: operation.into(),
239            copilot_enabled,
240            copilot_enabled_for_language,
241        });
242
243        self.report_event(event)
244    }
245
246    pub fn report_copilot_event(
247        self: &Arc<Self>,
248        suggestion_id: Option<String>,
249        suggestion_accepted: bool,
250        file_extension: Option<String>,
251    ) {
252        let event = Event::Copilot(CopilotEvent {
253            suggestion_id,
254            suggestion_accepted,
255            file_extension,
256        });
257
258        self.report_event(event)
259    }
260
261    pub fn report_assistant_event(
262        self: &Arc<Self>,
263        conversation_id: Option<String>,
264        kind: AssistantKind,
265        model: String,
266    ) {
267        let event = Event::Assistant(AssistantEvent {
268            conversation_id,
269            kind,
270            model: model.to_string(),
271        });
272
273        self.report_event(event)
274    }
275
276    pub fn report_call_event(
277        self: &Arc<Self>,
278        operation: &'static str,
279        room_id: Option<u64>,
280        channel_id: Option<ChannelId>,
281    ) {
282        let event = Event::Call(CallEvent {
283            operation: operation.to_string(),
284            room_id,
285            channel_id: channel_id.map(|cid| cid.0),
286        });
287
288        self.report_event(event)
289    }
290
291    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
292        let event = Event::Cpu(CpuEvent {
293            usage_as_percentage,
294            core_count,
295        });
296
297        self.report_event(event)
298    }
299
300    pub fn report_memory_event(
301        self: &Arc<Self>,
302        memory_in_bytes: u64,
303        virtual_memory_in_bytes: u64,
304    ) {
305        let event = Event::Memory(MemoryEvent {
306            memory_in_bytes,
307            virtual_memory_in_bytes,
308        });
309
310        self.report_event(event)
311    }
312
313    pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
314        let event = Event::App(AppEvent { operation });
315
316        self.report_event(event.clone());
317
318        event
319    }
320
321    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
322        let event = Event::Setting(SettingEvent {
323            setting: setting.to_string(),
324            value,
325        });
326
327        self.report_event(event)
328    }
329
330    pub fn report_extension_event(self: &Arc<Self>, extension_id: Arc<str>, version: Arc<str>) {
331        self.report_event(Event::Extension(ExtensionEvent {
332            extension_id,
333            version,
334        }))
335    }
336
337    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
338        let mut state = self.state.lock();
339        let period_data = state.event_coalescer.log_event(environment);
340        drop(state);
341
342        if let Some((start, end, environment)) = period_data {
343            let event = Event::Edit(EditEvent {
344                duration: end.timestamp_millis() - start.timestamp_millis(),
345                environment: environment.to_string(),
346            });
347
348            self.report_event(event);
349        }
350    }
351
352    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
353        let event = Event::Action(ActionEvent {
354            source: source.to_string(),
355            action,
356        });
357
358        self.report_event(event)
359    }
360
361    fn report_event(self: &Arc<Self>, event: Event) {
362        let mut state = self.state.lock();
363
364        if !state.settings.metrics {
365            return;
366        }
367
368        if state.flush_events_task.is_none() {
369            let this = self.clone();
370            let executor = self.executor.clone();
371            state.flush_events_task = Some(self.executor.spawn(async move {
372                executor.timer(FLUSH_INTERVAL).await;
373                this.flush_events();
374            }));
375        }
376
377        let date_time = self.clock.utc_now();
378
379        let milliseconds_since_first_event = match state.first_event_date_time {
380            Some(first_event_date_time) => {
381                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
382            }
383            None => {
384                state.first_event_date_time = Some(date_time);
385                0
386            }
387        };
388
389        let signed_in = state.metrics_id.is_some();
390        state.events_queue.push(EventWrapper {
391            signed_in,
392            milliseconds_since_first_event,
393            event,
394        });
395
396        if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
397            drop(state);
398            self.flush_events();
399        }
400    }
401
402    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
403        self.state.lock().metrics_id.clone()
404    }
405
406    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
407        self.state.lock().installation_id.clone()
408    }
409
410    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
411        self.state.lock().is_staff
412    }
413
414    pub fn flush_events(self: &Arc<Self>) {
415        let mut state = self.state.lock();
416        state.first_event_date_time = None;
417        let mut events = mem::take(&mut state.events_queue);
418        state.flush_events_task.take();
419        drop(state);
420        if events.is_empty() {
421            return;
422        }
423
424        let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
425            return;
426        };
427
428        let this = self.clone();
429        self.executor
430            .spawn(
431                async move {
432                    let mut json_bytes = Vec::new();
433
434                    if let Some(file) = &mut this.state.lock().log_file {
435                        let file = file.as_file_mut();
436                        for event in &mut events {
437                            json_bytes.clear();
438                            serde_json::to_writer(&mut json_bytes, event)?;
439                            file.write_all(&json_bytes)?;
440                            file.write_all(b"\n")?;
441                        }
442                    }
443
444                    {
445                        let state = this.state.lock();
446                        let request_body = EventRequestBody {
447                            installation_id: state.installation_id.as_deref().map(Into::into),
448                            session_id: state.session_id.clone(),
449                            is_staff: state.is_staff,
450                            app_version: state
451                                .app_metadata
452                                .app_version
453                                .unwrap_or_default()
454                                .to_string(),
455                            os_name: state.app_metadata.os_name.to_string(),
456                            os_version: state
457                                .app_metadata
458                                .os_version
459                                .map(|version| version.to_string()),
460                            architecture: state.architecture.to_string(),
461
462                            release_channel: state.release_channel.map(Into::into),
463                            events,
464                        };
465                        json_bytes.clear();
466                        serde_json::to_writer(&mut json_bytes, &request_body)?;
467                    }
468
469                    let mut summer = Sha256::new();
470                    summer.update(checksum_seed);
471                    summer.update(&json_bytes);
472                    summer.update(checksum_seed);
473                    let mut checksum = String::new();
474                    for byte in summer.finalize().as_slice() {
475                        use std::fmt::Write;
476                        write!(&mut checksum, "{:02x}", byte).unwrap();
477                    }
478
479                    let request = http::Request::builder()
480                        .method(Method::POST)
481                        .uri(
482                            this.http_client
483                                .build_zed_api_url("/telemetry/events", &[])?
484                                .as_ref(),
485                        )
486                        .header("Content-Type", "text/plain")
487                        .header("x-zed-checksum", checksum)
488                        .body(json_bytes.into());
489
490                    let response = this.http_client.send(request?).await?;
491                    if response.status() != 200 {
492                        log::error!("Failed to send events: HTTP {:?}", response.status());
493                    }
494                    anyhow::Ok(())
495                }
496                .log_err(),
497            )
498            .detach();
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use chrono::TimeZone;
506    use clock::FakeSystemClock;
507    use gpui::TestAppContext;
508    use util::http::FakeHttpClient;
509
510    #[gpui::test]
511    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
512        init_test(cx);
513        let clock = Arc::new(FakeSystemClock::new(
514            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
515        ));
516        let http = FakeHttpClient::with_200_response();
517        let installation_id = Some("installation_id".to_string());
518        let session_id = "session_id".to_string();
519
520        cx.update(|cx| {
521            let telemetry = Telemetry::new(clock.clone(), http, cx);
522
523            telemetry.state.lock().max_queue_size = 4;
524            telemetry.start(installation_id, session_id, cx);
525
526            assert!(is_empty_state(&telemetry));
527
528            let first_date_time = clock.utc_now();
529            let operation = "test".to_string();
530
531            let event = telemetry.report_app_event(operation.clone());
532            assert_eq!(
533                event,
534                Event::App(AppEvent {
535                    operation: operation.clone(),
536                })
537            );
538            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
539            assert!(telemetry.state.lock().flush_events_task.is_some());
540            assert_eq!(
541                telemetry.state.lock().first_event_date_time,
542                Some(first_date_time)
543            );
544
545            clock.advance(chrono::Duration::milliseconds(100));
546
547            let event = telemetry.report_app_event(operation.clone());
548            assert_eq!(
549                event,
550                Event::App(AppEvent {
551                    operation: operation.clone(),
552                })
553            );
554            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
555            assert!(telemetry.state.lock().flush_events_task.is_some());
556            assert_eq!(
557                telemetry.state.lock().first_event_date_time,
558                Some(first_date_time)
559            );
560
561            clock.advance(chrono::Duration::milliseconds(100));
562
563            let event = telemetry.report_app_event(operation.clone());
564            assert_eq!(
565                event,
566                Event::App(AppEvent {
567                    operation: operation.clone(),
568                })
569            );
570            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
571            assert!(telemetry.state.lock().flush_events_task.is_some());
572            assert_eq!(
573                telemetry.state.lock().first_event_date_time,
574                Some(first_date_time)
575            );
576
577            clock.advance(chrono::Duration::milliseconds(100));
578
579            // Adding a 4th event should cause a flush
580            let event = telemetry.report_app_event(operation.clone());
581            assert_eq!(
582                event,
583                Event::App(AppEvent {
584                    operation: operation.clone(),
585                })
586            );
587
588            assert!(is_empty_state(&telemetry));
589        });
590    }
591
592    #[gpui::test]
593    async fn test_telemetry_flush_on_flush_interval(
594        executor: BackgroundExecutor,
595        cx: &mut TestAppContext,
596    ) {
597        init_test(cx);
598        let clock = Arc::new(FakeSystemClock::new(
599            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
600        ));
601        let http = FakeHttpClient::with_200_response();
602        let installation_id = Some("installation_id".to_string());
603        let session_id = "session_id".to_string();
604
605        cx.update(|cx| {
606            let telemetry = Telemetry::new(clock.clone(), http, cx);
607            telemetry.state.lock().max_queue_size = 4;
608            telemetry.start(installation_id, session_id, cx);
609
610            assert!(is_empty_state(&telemetry));
611
612            let first_date_time = clock.utc_now();
613            let operation = "test".to_string();
614
615            let event = telemetry.report_app_event(operation.clone());
616            assert_eq!(
617                event,
618                Event::App(AppEvent {
619                    operation: operation.clone(),
620                })
621            );
622            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
623            assert!(telemetry.state.lock().flush_events_task.is_some());
624            assert_eq!(
625                telemetry.state.lock().first_event_date_time,
626                Some(first_date_time)
627            );
628
629            let duration = Duration::from_millis(1);
630
631            // Test 1 millisecond before the flush interval limit is met
632            executor.advance_clock(FLUSH_INTERVAL - duration);
633
634            assert!(!is_empty_state(&telemetry));
635
636            // Test the exact moment the flush interval limit is met
637            executor.advance_clock(duration);
638
639            assert!(is_empty_state(&telemetry));
640        });
641    }
642
643    // TODO:
644    // Test settings
645    // Update FakeHTTPClient to keep track of the number of requests and assert on it
646
647    fn init_test(cx: &mut TestAppContext) {
648        cx.update(|cx| {
649            let settings_store = SettingsStore::test(cx);
650            cx.set_global(settings_store);
651        });
652    }
653
654    fn is_empty_state(telemetry: &Telemetry) -> bool {
655        telemetry.state.lock().events_queue.is_empty()
656            && telemetry.state.lock().flush_events_task.is_none()
657            && telemetry.state.lock().first_event_date_time.is_none()
658    }
659}