telemetry.rs

  1mod event_coalescer;
  2
  3use crate::{ChannelId, TelemetrySettings};
  4use chrono::{DateTime, Utc};
  5use clock::SystemClock;
  6use futures::Future;
  7use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
  8use once_cell::sync::Lazy;
  9use parking_lot::Mutex;
 10use release_channel::ReleaseChannel;
 11use settings::{Settings, SettingsStore};
 12use sha2::{Digest, Sha256};
 13use std::io::Write;
 14use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 15use sysinfo::{
 16    CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
 17};
 18use telemetry_events::{
 19    ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CopilotEvent, CpuEvent,
 20    EditEvent, EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
 21};
 22use tempfile::NamedTempFile;
 23use util::http::{self, HttpClient, HttpClientWithUrl, Method};
 24#[cfg(not(debug_assertions))]
 25use util::ResultExt;
 26use util::TryFutureExt;
 27
 28use self::event_coalescer::EventCoalescer;
 29
 30pub struct Telemetry {
 31    clock: Arc<dyn SystemClock>,
 32    http_client: Arc<HttpClientWithUrl>,
 33    executor: BackgroundExecutor,
 34    state: Arc<Mutex<TelemetryState>>,
 35}
 36
 37struct TelemetryState {
 38    settings: TelemetrySettings,
 39    metrics_id: Option<Arc<str>>,      // Per logged-in user
 40    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 41    session_id: Option<String>,        // Per app launch
 42    release_channel: Option<&'static str>,
 43    app_metadata: AppMetadata,
 44    architecture: &'static str,
 45    events_queue: Vec<EventWrapper>,
 46    flush_events_task: Option<Task<()>>,
 47    log_file: Option<NamedTempFile>,
 48    is_staff: Option<bool>,
 49    first_event_date_time: Option<DateTime<Utc>>,
 50    event_coalescer: EventCoalescer,
 51    max_queue_size: usize,
 52}
 53
 54#[cfg(debug_assertions)]
 55const MAX_QUEUE_LEN: usize = 5;
 56
 57#[cfg(not(debug_assertions))]
 58const MAX_QUEUE_LEN: usize = 50;
 59
 60#[cfg(debug_assertions)]
 61const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
 62
 63#[cfg(not(debug_assertions))]
 64const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
 65static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
 66    option_env!("ZED_CLIENT_CHECKSUM_SEED")
 67        .map(|s| s.as_bytes().into())
 68        .or_else(|| {
 69            env::var("ZED_CLIENT_CHECKSUM_SEED")
 70                .ok()
 71                .map(|s| s.as_bytes().into())
 72        })
 73});
 74
 75impl Telemetry {
 76    pub fn new(
 77        clock: Arc<dyn SystemClock>,
 78        client: Arc<HttpClientWithUrl>,
 79        cx: &mut AppContext,
 80    ) -> Arc<Self> {
 81        let release_channel =
 82            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
 83
 84        TelemetrySettings::register(cx);
 85
 86        let state = Arc::new(Mutex::new(TelemetryState {
 87            settings: *TelemetrySettings::get_global(cx),
 88            app_metadata: cx.app_metadata(),
 89            architecture: env::consts::ARCH,
 90            release_channel,
 91            installation_id: None,
 92            metrics_id: None,
 93            session_id: None,
 94            events_queue: Vec::new(),
 95            flush_events_task: None,
 96            log_file: None,
 97            is_staff: None,
 98            first_event_date_time: None,
 99            event_coalescer: EventCoalescer::new(clock.clone()),
100            max_queue_size: MAX_QUEUE_LEN,
101        }));
102
103        #[cfg(not(debug_assertions))]
104        cx.background_executor()
105            .spawn({
106                let state = state.clone();
107                async move {
108                    if let Some(tempfile) =
109                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
110                    {
111                        state.lock().log_file = Some(tempfile);
112                    }
113                }
114            })
115            .detach();
116
117        cx.observe_global::<SettingsStore>({
118            let state = state.clone();
119
120            move |cx| {
121                let mut state = state.lock();
122                state.settings = *TelemetrySettings::get_global(cx);
123            }
124        })
125        .detach();
126
127        // TODO: Replace all hardware stuff with nested SystemSpecs json
128        let this = Arc::new(Self {
129            clock,
130            http_client: client,
131            executor: cx.background_executor().clone(),
132            state,
133        });
134
135        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
136        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
137        std::mem::forget(cx.on_app_quit({
138            let this = this.clone();
139            move |_| this.shutdown_telemetry()
140        }));
141
142        this
143    }
144
145    #[cfg(any(test, feature = "test-support"))]
146    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
147        Task::ready(())
148    }
149
150    // Skip calling this function in tests.
151    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
152    #[cfg(not(any(test, feature = "test-support")))]
153    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
154        self.report_app_event("close".to_string());
155        // TODO: close final edit period and make sure it's sent
156        Task::ready(())
157    }
158
159    pub fn log_file_path(&self) -> Option<PathBuf> {
160        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
161    }
162
163    pub fn start(
164        self: &Arc<Self>,
165        installation_id: Option<String>,
166        session_id: String,
167        cx: &mut AppContext,
168    ) {
169        let mut state = self.state.lock();
170        state.installation_id = installation_id.map(|id| id.into());
171        state.session_id = Some(session_id);
172        drop(state);
173
174        let this = self.clone();
175        cx.spawn(|_| async move {
176            // Avoiding calling `System::new_all()`, as there have been crashes related to it
177            let refresh_kind = RefreshKind::new()
178                .with_memory() // For memory usage
179                .with_processes(ProcessRefreshKind::everything()) // For process usage
180                .with_cpu(CpuRefreshKind::everything()); // For core count
181
182            let mut system = System::new_with_specifics(refresh_kind);
183
184            // Avoiding calling `refresh_all()`, just update what we need
185            system.refresh_specifics(refresh_kind);
186
187            // Waiting some amount of time before the first query is important to get a reasonable value
188            // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
189            const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
190
191            loop {
192                smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
193
194                system.refresh_specifics(refresh_kind);
195
196                let current_process = Pid::from_u32(std::process::id());
197                let Some(process) = system.processes().get(&current_process) else {
198                    let process = current_process;
199                    log::error!("Failed to find own process {process:?} in system process table");
200                    // TODO: Fire an error telemetry event
201                    return;
202                };
203
204                this.report_memory_event(process.memory(), process.virtual_memory());
205                this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
206            }
207        })
208        .detach();
209    }
210
211    pub fn set_authenticated_user_info(
212        self: &Arc<Self>,
213        metrics_id: Option<String>,
214        is_staff: bool,
215    ) {
216        let mut state = self.state.lock();
217
218        if !state.settings.metrics {
219            return;
220        }
221
222        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
223        state.metrics_id = metrics_id.clone();
224        state.is_staff = Some(is_staff);
225        drop(state);
226    }
227
228    pub fn report_editor_event(
229        self: &Arc<Self>,
230        file_extension: Option<String>,
231        vim_mode: bool,
232        operation: &'static str,
233        copilot_enabled: bool,
234        copilot_enabled_for_language: bool,
235    ) {
236        let event = Event::Editor(EditorEvent {
237            file_extension,
238            vim_mode,
239            operation: operation.into(),
240            copilot_enabled,
241            copilot_enabled_for_language,
242        });
243
244        self.report_event(event)
245    }
246
247    pub fn report_copilot_event(
248        self: &Arc<Self>,
249        suggestion_id: Option<String>,
250        suggestion_accepted: bool,
251        file_extension: Option<String>,
252    ) {
253        let event = Event::Copilot(CopilotEvent {
254            suggestion_id,
255            suggestion_accepted,
256            file_extension,
257        });
258
259        self.report_event(event)
260    }
261
262    pub fn report_assistant_event(
263        self: &Arc<Self>,
264        conversation_id: Option<String>,
265        kind: AssistantKind,
266        model: &str,
267    ) {
268        let event = Event::Assistant(AssistantEvent {
269            conversation_id,
270            kind,
271            model: model.to_string(),
272        });
273
274        self.report_event(event)
275    }
276
277    pub fn report_call_event(
278        self: &Arc<Self>,
279        operation: &'static str,
280        room_id: Option<u64>,
281        channel_id: Option<ChannelId>,
282    ) {
283        let event = Event::Call(CallEvent {
284            operation: operation.to_string(),
285            room_id,
286            channel_id: channel_id.map(|cid| cid.0),
287        });
288
289        self.report_event(event)
290    }
291
292    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
293        let event = Event::Cpu(CpuEvent {
294            usage_as_percentage,
295            core_count,
296        });
297
298        self.report_event(event)
299    }
300
301    pub fn report_memory_event(
302        self: &Arc<Self>,
303        memory_in_bytes: u64,
304        virtual_memory_in_bytes: u64,
305    ) {
306        let event = Event::Memory(MemoryEvent {
307            memory_in_bytes,
308            virtual_memory_in_bytes,
309        });
310
311        self.report_event(event)
312    }
313
314    pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
315        let event = Event::App(AppEvent { operation });
316
317        self.report_event(event.clone());
318
319        event
320    }
321
322    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
323        let event = Event::Setting(SettingEvent {
324            setting: setting.to_string(),
325            value,
326        });
327
328        self.report_event(event)
329    }
330
331    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
332        let mut state = self.state.lock();
333        let period_data = state.event_coalescer.log_event(environment);
334        drop(state);
335
336        if let Some((start, end, environment)) = period_data {
337            let event = Event::Edit(EditEvent {
338                duration: end.timestamp_millis() - start.timestamp_millis(),
339                environment: environment.to_string(),
340            });
341
342            self.report_event(event);
343        }
344    }
345
346    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
347        let event = Event::Action(ActionEvent {
348            source: source.to_string(),
349            action,
350        });
351
352        self.report_event(event)
353    }
354
355    fn report_event(self: &Arc<Self>, event: Event) {
356        let mut state = self.state.lock();
357
358        if !state.settings.metrics {
359            return;
360        }
361
362        if state.flush_events_task.is_none() {
363            let this = self.clone();
364            let executor = self.executor.clone();
365            state.flush_events_task = Some(self.executor.spawn(async move {
366                executor.timer(FLUSH_INTERVAL).await;
367                this.flush_events();
368            }));
369        }
370
371        let date_time = self.clock.utc_now();
372
373        let milliseconds_since_first_event = match state.first_event_date_time {
374            Some(first_event_date_time) => {
375                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
376            }
377            None => {
378                state.first_event_date_time = Some(date_time);
379                0
380            }
381        };
382
383        let signed_in = state.metrics_id.is_some();
384        state.events_queue.push(EventWrapper {
385            signed_in,
386            milliseconds_since_first_event,
387            event,
388        });
389
390        if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
391            drop(state);
392            self.flush_events();
393        }
394    }
395
396    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
397        self.state.lock().metrics_id.clone()
398    }
399
400    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
401        self.state.lock().installation_id.clone()
402    }
403
404    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
405        self.state.lock().is_staff
406    }
407
408    pub fn flush_events(self: &Arc<Self>) {
409        let mut state = self.state.lock();
410        state.first_event_date_time = None;
411        let mut events = mem::take(&mut state.events_queue);
412        state.flush_events_task.take();
413        drop(state);
414        if events.is_empty() {
415            return;
416        }
417
418        let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
419            return;
420        };
421
422        let this = self.clone();
423        self.executor
424            .spawn(
425                async move {
426                    let mut json_bytes = Vec::new();
427
428                    if let Some(file) = &mut this.state.lock().log_file {
429                        let file = file.as_file_mut();
430                        for event in &mut events {
431                            json_bytes.clear();
432                            serde_json::to_writer(&mut json_bytes, event)?;
433                            file.write_all(&json_bytes)?;
434                            file.write_all(b"\n")?;
435                        }
436                    }
437
438                    {
439                        let state = this.state.lock();
440                        let request_body = EventRequestBody {
441                            installation_id: state.installation_id.as_deref().map(Into::into),
442                            session_id: state.session_id.clone(),
443                            is_staff: state.is_staff,
444                            app_version: state
445                                .app_metadata
446                                .app_version
447                                .unwrap_or_default()
448                                .to_string(),
449                            os_name: state.app_metadata.os_name.to_string(),
450                            os_version: state
451                                .app_metadata
452                                .os_version
453                                .map(|version| version.to_string()),
454                            architecture: state.architecture.to_string(),
455
456                            release_channel: state.release_channel.map(Into::into),
457                            events,
458                        };
459                        json_bytes.clear();
460                        serde_json::to_writer(&mut json_bytes, &request_body)?;
461                    }
462
463                    let mut summer = Sha256::new();
464                    summer.update(checksum_seed);
465                    summer.update(&json_bytes);
466                    summer.update(checksum_seed);
467                    let mut checksum = String::new();
468                    for byte in summer.finalize().as_slice() {
469                        use std::fmt::Write;
470                        write!(&mut checksum, "{:02x}", byte).unwrap();
471                    }
472
473                    let request = http::Request::builder()
474                        .method(Method::POST)
475                        .uri(this.http_client.build_zed_api_url("/telemetry/events"))
476                        .header("Content-Type", "text/plain")
477                        .header("x-zed-checksum", checksum)
478                        .body(json_bytes.into());
479
480                    let response = this.http_client.send(request?).await?;
481                    if response.status() != 200 {
482                        log::error!("Failed to send events: HTTP {:?}", response.status());
483                    }
484                    anyhow::Ok(())
485                }
486                .log_err(),
487            )
488            .detach();
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use chrono::TimeZone;
496    use clock::FakeSystemClock;
497    use gpui::TestAppContext;
498    use util::http::FakeHttpClient;
499
500    #[gpui::test]
501    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
502        init_test(cx);
503        let clock = Arc::new(FakeSystemClock::new(
504            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
505        ));
506        let http = FakeHttpClient::with_200_response();
507        let installation_id = Some("installation_id".to_string());
508        let session_id = "session_id".to_string();
509
510        cx.update(|cx| {
511            let telemetry = Telemetry::new(clock.clone(), http, cx);
512
513            telemetry.state.lock().max_queue_size = 4;
514            telemetry.start(installation_id, session_id, cx);
515
516            assert!(is_empty_state(&telemetry));
517
518            let first_date_time = clock.utc_now();
519            let operation = "test".to_string();
520
521            let event = telemetry.report_app_event(operation.clone());
522            assert_eq!(
523                event,
524                Event::App(AppEvent {
525                    operation: operation.clone(),
526                })
527            );
528            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
529            assert!(telemetry.state.lock().flush_events_task.is_some());
530            assert_eq!(
531                telemetry.state.lock().first_event_date_time,
532                Some(first_date_time)
533            );
534
535            clock.advance(chrono::Duration::milliseconds(100));
536
537            let event = telemetry.report_app_event(operation.clone());
538            assert_eq!(
539                event,
540                Event::App(AppEvent {
541                    operation: operation.clone(),
542                })
543            );
544            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
545            assert!(telemetry.state.lock().flush_events_task.is_some());
546            assert_eq!(
547                telemetry.state.lock().first_event_date_time,
548                Some(first_date_time)
549            );
550
551            clock.advance(chrono::Duration::milliseconds(100));
552
553            let event = telemetry.report_app_event(operation.clone());
554            assert_eq!(
555                event,
556                Event::App(AppEvent {
557                    operation: operation.clone(),
558                })
559            );
560            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
561            assert!(telemetry.state.lock().flush_events_task.is_some());
562            assert_eq!(
563                telemetry.state.lock().first_event_date_time,
564                Some(first_date_time)
565            );
566
567            clock.advance(chrono::Duration::milliseconds(100));
568
569            // Adding a 4th event should cause a flush
570            let event = telemetry.report_app_event(operation.clone());
571            assert_eq!(
572                event,
573                Event::App(AppEvent {
574                    operation: operation.clone(),
575                })
576            );
577
578            assert!(is_empty_state(&telemetry));
579        });
580    }
581
582    #[gpui::test]
583    async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
584        init_test(cx);
585        let clock = Arc::new(FakeSystemClock::new(
586            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
587        ));
588        let http = FakeHttpClient::with_200_response();
589        let installation_id = Some("installation_id".to_string());
590        let session_id = "session_id".to_string();
591
592        cx.update(|cx| {
593            let telemetry = Telemetry::new(clock.clone(), http, cx);
594            telemetry.state.lock().max_queue_size = 4;
595            telemetry.start(installation_id, session_id, cx);
596
597            assert!(is_empty_state(&telemetry));
598
599            let first_date_time = clock.utc_now();
600            let operation = "test".to_string();
601
602            let event = telemetry.report_app_event(operation.clone());
603            assert_eq!(
604                event,
605                Event::App(AppEvent {
606                    operation: operation.clone(),
607                })
608            );
609            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
610            assert!(telemetry.state.lock().flush_events_task.is_some());
611            assert_eq!(
612                telemetry.state.lock().first_event_date_time,
613                Some(first_date_time)
614            );
615
616            let duration = Duration::from_millis(1);
617
618            // Test 1 millisecond before the flush interval limit is met
619            executor.advance_clock(FLUSH_INTERVAL - duration);
620
621            assert!(!is_empty_state(&telemetry));
622
623            // Test the exact moment the flush interval limit is met
624            executor.advance_clock(duration);
625
626            assert!(is_empty_state(&telemetry));
627        });
628    }
629
630    // TODO:
631    // Test settings
632    // Update FakeHTTPClient to keep track of the number of requests and assert on it
633
634    fn init_test(cx: &mut TestAppContext) {
635        cx.update(|cx| {
636            let settings_store = SettingsStore::test(cx);
637            cx.set_global(settings_store);
638        });
639    }
640
641    fn is_empty_state(telemetry: &Telemetry) -> bool {
642        telemetry.state.lock().events_queue.is_empty()
643            && telemetry.state.lock().flush_events_task.is_none()
644            && telemetry.state.lock().first_event_date_time.is_none()
645    }
646}