telemetry.rs

  1mod event_coalescer;
  2
  3use crate::TelemetrySettings;
  4use chrono::{DateTime, Utc};
  5use clock::SystemClock;
  6use futures::Future;
  7use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
  8use once_cell::sync::Lazy;
  9use parking_lot::Mutex;
 10use release_channel::ReleaseChannel;
 11use settings::{Settings, SettingsStore};
 12use sha2::{Digest, Sha256};
 13use std::io::Write;
 14use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 15use sysinfo::{
 16    CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
 17};
 18use telemetry_events::{
 19    ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CopilotEvent, CpuEvent,
 20    EditEvent, EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
 21};
 22use tempfile::NamedTempFile;
 23use util::http::{self, HttpClient, HttpClientWithUrl, Method};
 24#[cfg(not(debug_assertions))]
 25use util::ResultExt;
 26use util::TryFutureExt;
 27
 28use self::event_coalescer::EventCoalescer;
 29
 30pub struct Telemetry {
 31    clock: Arc<dyn SystemClock>,
 32    http_client: Arc<HttpClientWithUrl>,
 33    executor: BackgroundExecutor,
 34    state: Arc<Mutex<TelemetryState>>,
 35}
 36
 37struct TelemetryState {
 38    settings: TelemetrySettings,
 39    metrics_id: Option<Arc<str>>,      // Per logged-in user
 40    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 41    session_id: Option<String>,        // Per app launch
 42    release_channel: Option<&'static str>,
 43    app_metadata: AppMetadata,
 44    architecture: &'static str,
 45    events_queue: Vec<EventWrapper>,
 46    flush_events_task: Option<Task<()>>,
 47    log_file: Option<NamedTempFile>,
 48    is_staff: Option<bool>,
 49    first_event_date_time: Option<DateTime<Utc>>,
 50    event_coalescer: EventCoalescer,
 51    max_queue_size: usize,
 52}
 53
 54#[cfg(debug_assertions)]
 55const MAX_QUEUE_LEN: usize = 5;
 56
 57#[cfg(not(debug_assertions))]
 58const MAX_QUEUE_LEN: usize = 50;
 59
 60#[cfg(debug_assertions)]
 61const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
 62
 63#[cfg(not(debug_assertions))]
 64const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
 65static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
 66    option_env!("ZED_CLIENT_CHECKSUM_SEED")
 67        .map(|s| s.as_bytes().into())
 68        .or_else(|| {
 69            env::var("ZED_CLIENT_CHECKSUM_SEED")
 70                .ok()
 71                .map(|s| s.as_bytes().into())
 72        })
 73});
 74
 75impl Telemetry {
 76    pub fn new(
 77        clock: Arc<dyn SystemClock>,
 78        client: Arc<HttpClientWithUrl>,
 79        cx: &mut AppContext,
 80    ) -> Arc<Self> {
 81        let release_channel =
 82            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
 83
 84        TelemetrySettings::register(cx);
 85
 86        let state = Arc::new(Mutex::new(TelemetryState {
 87            settings: TelemetrySettings::get_global(cx).clone(),
 88            app_metadata: cx.app_metadata(),
 89            architecture: env::consts::ARCH,
 90            release_channel,
 91            installation_id: None,
 92            metrics_id: None,
 93            session_id: None,
 94            events_queue: Vec::new(),
 95            flush_events_task: None,
 96            log_file: None,
 97            is_staff: None,
 98            first_event_date_time: None,
 99            event_coalescer: EventCoalescer::new(clock.clone()),
100            max_queue_size: MAX_QUEUE_LEN,
101        }));
102
103        #[cfg(not(debug_assertions))]
104        cx.background_executor()
105            .spawn({
106                let state = state.clone();
107                async move {
108                    if let Some(tempfile) =
109                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
110                    {
111                        state.lock().log_file = Some(tempfile);
112                    }
113                }
114            })
115            .detach();
116
117        cx.observe_global::<SettingsStore>({
118            let state = state.clone();
119
120            move |cx| {
121                let mut state = state.lock();
122                state.settings = TelemetrySettings::get_global(cx).clone();
123            }
124        })
125        .detach();
126
127        // TODO: Replace all hardware stuff with nested SystemSpecs json
128        let this = Arc::new(Self {
129            clock,
130            http_client: client,
131            executor: cx.background_executor().clone(),
132            state,
133        });
134
135        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
136        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
137        std::mem::forget(cx.on_app_quit({
138            let this = this.clone();
139            move |_| this.shutdown_telemetry()
140        }));
141
142        this
143    }
144
145    #[cfg(any(test, feature = "test-support"))]
146    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
147        Task::ready(())
148    }
149
150    // Skip calling this function in tests.
151    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
152    #[cfg(not(any(test, feature = "test-support")))]
153    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
154        self.report_app_event("close".to_string());
155        // TODO: close final edit period and make sure it's sent
156        Task::ready(())
157    }
158
159    pub fn log_file_path(&self) -> Option<PathBuf> {
160        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
161    }
162
163    pub fn start(
164        self: &Arc<Self>,
165        installation_id: Option<String>,
166        session_id: String,
167        cx: &mut AppContext,
168    ) {
169        let mut state = self.state.lock();
170        state.installation_id = installation_id.map(|id| id.into());
171        state.session_id = Some(session_id.into());
172        drop(state);
173
174        let this = self.clone();
175        cx.spawn(|_| async move {
176            // Avoiding calling `System::new_all()`, as there have been crashes related to it
177            let refresh_kind = RefreshKind::new()
178                .with_memory() // For memory usage
179                .with_processes(ProcessRefreshKind::everything()) // For process usage
180                .with_cpu(CpuRefreshKind::everything()); // For core count
181
182            let mut system = System::new_with_specifics(refresh_kind);
183
184            // Avoiding calling `refresh_all()`, just update what we need
185            system.refresh_specifics(refresh_kind);
186
187            // Waiting some amount of time before the first query is important to get a reasonable value
188            // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
189            const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
190
191            loop {
192                smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
193
194                system.refresh_specifics(refresh_kind);
195
196                let current_process = Pid::from_u32(std::process::id());
197                let Some(process) = system.processes().get(&current_process) else {
198                    let process = current_process;
199                    log::error!("Failed to find own process {process:?} in system process table");
200                    // TODO: Fire an error telemetry event
201                    return;
202                };
203
204                this.report_memory_event(process.memory(), process.virtual_memory());
205                this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
206            }
207        })
208        .detach();
209    }
210
211    pub fn set_authenticated_user_info(
212        self: &Arc<Self>,
213        metrics_id: Option<String>,
214        is_staff: bool,
215    ) {
216        let mut state = self.state.lock();
217
218        if !state.settings.metrics {
219            return;
220        }
221
222        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
223        state.metrics_id = metrics_id.clone();
224        state.is_staff = Some(is_staff);
225        drop(state);
226    }
227
228    pub fn report_editor_event(
229        self: &Arc<Self>,
230        file_extension: Option<String>,
231        vim_mode: bool,
232        operation: &'static str,
233        copilot_enabled: bool,
234        copilot_enabled_for_language: bool,
235    ) {
236        let event = Event::Editor(EditorEvent {
237            file_extension,
238            vim_mode,
239            operation: operation.into(),
240            copilot_enabled,
241            copilot_enabled_for_language,
242        });
243
244        self.report_event(event)
245    }
246
247    pub fn report_copilot_event(
248        self: &Arc<Self>,
249        suggestion_id: Option<String>,
250        suggestion_accepted: bool,
251        file_extension: Option<String>,
252    ) {
253        let event = Event::Copilot(CopilotEvent {
254            suggestion_id,
255            suggestion_accepted,
256            file_extension,
257        });
258
259        self.report_event(event)
260    }
261
262    pub fn report_assistant_event(
263        self: &Arc<Self>,
264        conversation_id: Option<String>,
265        kind: AssistantKind,
266        model: &'static str,
267    ) {
268        let event = Event::Assistant(AssistantEvent {
269            conversation_id,
270            kind,
271            model: model.to_string(),
272        });
273
274        self.report_event(event)
275    }
276
277    pub fn report_call_event(
278        self: &Arc<Self>,
279        operation: &'static str,
280        room_id: Option<u64>,
281        channel_id: Option<u64>,
282    ) {
283        let event = Event::Call(CallEvent {
284            operation: operation.to_string(),
285            room_id,
286            channel_id,
287        });
288
289        self.report_event(event)
290    }
291
292    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
293        let event = Event::Cpu(CpuEvent {
294            usage_as_percentage,
295            core_count,
296        });
297
298        self.report_event(event)
299    }
300
301    pub fn report_memory_event(
302        self: &Arc<Self>,
303        memory_in_bytes: u64,
304        virtual_memory_in_bytes: u64,
305    ) {
306        let event = Event::Memory(MemoryEvent {
307            memory_in_bytes,
308            virtual_memory_in_bytes,
309        });
310
311        self.report_event(event)
312    }
313
314    pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
315        let event = Event::App(AppEvent { operation });
316
317        self.report_event(event.clone());
318
319        event
320    }
321
322    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
323        let event = Event::Setting(SettingEvent {
324            setting: setting.to_string(),
325            value,
326        });
327
328        self.report_event(event)
329    }
330
331    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
332        let mut state = self.state.lock();
333        let period_data = state.event_coalescer.log_event(environment);
334        drop(state);
335
336        if let Some((start, end, environment)) = period_data {
337            let event = Event::Edit(EditEvent {
338                duration: end.timestamp_millis() - start.timestamp_millis(),
339                environment: environment.to_string(),
340            });
341
342            self.report_event(event);
343        }
344    }
345
346    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
347        let event = Event::Action(ActionEvent {
348            source: source.to_string(),
349            action,
350        });
351
352        self.report_event(event)
353    }
354
355    fn report_event(self: &Arc<Self>, event: Event) {
356        let mut state = self.state.lock();
357
358        if !state.settings.metrics {
359            return;
360        }
361
362        if state.flush_events_task.is_none() {
363            let this = self.clone();
364            let executor = self.executor.clone();
365            state.flush_events_task = Some(self.executor.spawn(async move {
366                executor.timer(FLUSH_INTERVAL).await;
367                this.flush_events();
368            }));
369        }
370
371        let date_time = self.clock.utc_now();
372
373        let milliseconds_since_first_event = match state.first_event_date_time {
374            Some(first_event_date_time) => {
375                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
376            }
377            None => {
378                state.first_event_date_time = Some(date_time);
379                0
380            }
381        };
382
383        let signed_in = state.metrics_id.is_some();
384        state.events_queue.push(EventWrapper {
385            signed_in,
386            milliseconds_since_first_event,
387            event,
388        });
389
390        if state.installation_id.is_some() {
391            if state.events_queue.len() >= state.max_queue_size {
392                drop(state);
393                self.flush_events();
394            }
395        }
396    }
397
398    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
399        self.state.lock().metrics_id.clone()
400    }
401
402    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
403        self.state.lock().installation_id.clone()
404    }
405
406    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
407        self.state.lock().is_staff
408    }
409
410    pub fn flush_events(self: &Arc<Self>) {
411        let mut state = self.state.lock();
412        state.first_event_date_time = None;
413        let mut events = mem::take(&mut state.events_queue);
414        state.flush_events_task.take();
415        drop(state);
416        if events.is_empty() {
417            return;
418        }
419
420        let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
421            return;
422        };
423
424        let this = self.clone();
425        self.executor
426            .spawn(
427                async move {
428                    let mut json_bytes = Vec::new();
429
430                    if let Some(file) = &mut this.state.lock().log_file {
431                        let file = file.as_file_mut();
432                        for event in &mut events {
433                            json_bytes.clear();
434                            serde_json::to_writer(&mut json_bytes, event)?;
435                            file.write_all(&json_bytes)?;
436                            file.write(b"\n")?;
437                        }
438                    }
439
440                    {
441                        let state = this.state.lock();
442                        let request_body = EventRequestBody {
443                            installation_id: state.installation_id.as_deref().map(Into::into),
444                            session_id: state.session_id.clone(),
445                            is_staff: state.is_staff.clone(),
446                            app_version: state
447                                .app_metadata
448                                .app_version
449                                .unwrap_or_default()
450                                .to_string(),
451                            os_name: state.app_metadata.os_name.to_string(),
452                            os_version: state
453                                .app_metadata
454                                .os_version
455                                .map(|version| version.to_string()),
456                            architecture: state.architecture.to_string(),
457
458                            release_channel: state.release_channel.map(Into::into),
459                            events,
460                        };
461                        json_bytes.clear();
462                        serde_json::to_writer(&mut json_bytes, &request_body)?;
463                    }
464
465                    let mut summer = Sha256::new();
466                    summer.update(checksum_seed);
467                    summer.update(&json_bytes);
468                    summer.update(checksum_seed);
469                    let mut checksum = String::new();
470                    for byte in summer.finalize().as_slice() {
471                        use std::fmt::Write;
472                        write!(&mut checksum, "{:02x}", byte).unwrap();
473                    }
474
475                    let request = http::Request::builder()
476                        .method(Method::POST)
477                        .uri(this.http_client.build_zed_api_url("/telemetry/events"))
478                        .header("Content-Type", "text/plain")
479                        .header("x-zed-checksum", checksum)
480                        .body(json_bytes.into());
481
482                    let response = this.http_client.send(request?).await?;
483                    if response.status() != 200 {
484                        log::error!("Failed to send events: HTTP {:?}", response.status());
485                    }
486                    anyhow::Ok(())
487                }
488                .log_err(),
489            )
490            .detach();
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use chrono::TimeZone;
498    use clock::FakeSystemClock;
499    use gpui::TestAppContext;
500    use util::http::FakeHttpClient;
501
502    #[gpui::test]
503    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
504        init_test(cx);
505        let clock = Arc::new(FakeSystemClock::new(
506            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
507        ));
508        let http = FakeHttpClient::with_200_response();
509        let installation_id = Some("installation_id".to_string());
510        let session_id = "session_id".to_string();
511
512        cx.update(|cx| {
513            let telemetry = Telemetry::new(clock.clone(), http, cx);
514
515            telemetry.state.lock().max_queue_size = 4;
516            telemetry.start(installation_id, session_id, cx);
517
518            assert!(is_empty_state(&telemetry));
519
520            let first_date_time = clock.utc_now();
521            let operation = "test".to_string();
522
523            let event = telemetry.report_app_event(operation.clone());
524            assert_eq!(
525                event,
526                Event::App(AppEvent {
527                    operation: operation.clone(),
528                })
529            );
530            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
531            assert!(telemetry.state.lock().flush_events_task.is_some());
532            assert_eq!(
533                telemetry.state.lock().first_event_date_time,
534                Some(first_date_time)
535            );
536
537            clock.advance(chrono::Duration::milliseconds(100));
538
539            let event = telemetry.report_app_event(operation.clone());
540            assert_eq!(
541                event,
542                Event::App(AppEvent {
543                    operation: operation.clone(),
544                })
545            );
546            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
547            assert!(telemetry.state.lock().flush_events_task.is_some());
548            assert_eq!(
549                telemetry.state.lock().first_event_date_time,
550                Some(first_date_time)
551            );
552
553            clock.advance(chrono::Duration::milliseconds(100));
554
555            let event = telemetry.report_app_event(operation.clone());
556            assert_eq!(
557                event,
558                Event::App(AppEvent {
559                    operation: operation.clone(),
560                })
561            );
562            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
563            assert!(telemetry.state.lock().flush_events_task.is_some());
564            assert_eq!(
565                telemetry.state.lock().first_event_date_time,
566                Some(first_date_time)
567            );
568
569            clock.advance(chrono::Duration::milliseconds(100));
570
571            // Adding a 4th event should cause a flush
572            let event = telemetry.report_app_event(operation.clone());
573            assert_eq!(
574                event,
575                Event::App(AppEvent {
576                    operation: operation.clone(),
577                })
578            );
579
580            assert!(is_empty_state(&telemetry));
581        });
582    }
583
584    #[gpui::test]
585    async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
586        init_test(cx);
587        let clock = Arc::new(FakeSystemClock::new(
588            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
589        ));
590        let http = FakeHttpClient::with_200_response();
591        let installation_id = Some("installation_id".to_string());
592        let session_id = "session_id".to_string();
593
594        cx.update(|cx| {
595            let telemetry = Telemetry::new(clock.clone(), http, cx);
596            telemetry.state.lock().max_queue_size = 4;
597            telemetry.start(installation_id, session_id, cx);
598
599            assert!(is_empty_state(&telemetry));
600
601            let first_date_time = clock.utc_now();
602            let operation = "test".to_string();
603
604            let event = telemetry.report_app_event(operation.clone());
605            assert_eq!(
606                event,
607                Event::App(AppEvent {
608                    operation: operation.clone(),
609                })
610            );
611            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
612            assert!(telemetry.state.lock().flush_events_task.is_some());
613            assert_eq!(
614                telemetry.state.lock().first_event_date_time,
615                Some(first_date_time)
616            );
617
618            let duration = Duration::from_millis(1);
619
620            // Test 1 millisecond before the flush interval limit is met
621            executor.advance_clock(FLUSH_INTERVAL - duration);
622
623            assert!(!is_empty_state(&telemetry));
624
625            // Test the exact moment the flush interval limit is met
626            executor.advance_clock(duration);
627
628            assert!(is_empty_state(&telemetry));
629        });
630    }
631
632    // TODO:
633    // Test settings
634    // Update FakeHTTPClient to keep track of the number of requests and assert on it
635
636    fn init_test(cx: &mut TestAppContext) {
637        cx.update(|cx| {
638            let settings_store = SettingsStore::test(cx);
639            cx.set_global(settings_store);
640        });
641    }
642
643    fn is_empty_state(telemetry: &Telemetry) -> bool {
644        telemetry.state.lock().events_queue.is_empty()
645            && telemetry.state.lock().flush_events_task.is_none()
646            && telemetry.state.lock().first_event_date_time.is_none()
647    }
648}