telemetry.rs

  1mod event_coalescer;
  2
  3use crate::{ChannelId, TelemetrySettings};
  4use chrono::{DateTime, Utc};
  5use clock::SystemClock;
  6use futures::Future;
  7use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
  8use once_cell::sync::Lazy;
  9use parking_lot::Mutex;
 10use release_channel::ReleaseChannel;
 11use settings::{Settings, SettingsStore};
 12use sha2::{Digest, Sha256};
 13use std::io::Write;
 14use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 15use sysinfo::{CpuRefreshKind, MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};
 16use telemetry_events::{
 17    ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CopilotEvent, CpuEvent,
 18    EditEvent, EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
 19};
 20use tempfile::NamedTempFile;
 21use util::http::{self, HttpClient, HttpClientWithUrl, Method};
 22#[cfg(not(debug_assertions))]
 23use util::ResultExt;
 24use util::TryFutureExt;
 25
 26use self::event_coalescer::EventCoalescer;
 27
 28pub struct Telemetry {
 29    clock: Arc<dyn SystemClock>,
 30    http_client: Arc<HttpClientWithUrl>,
 31    executor: BackgroundExecutor,
 32    state: Arc<Mutex<TelemetryState>>,
 33}
 34
 35struct TelemetryState {
 36    settings: TelemetrySettings,
 37    metrics_id: Option<Arc<str>>,      // Per logged-in user
 38    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 39    session_id: Option<String>,        // Per app launch
 40    release_channel: Option<&'static str>,
 41    app_metadata: AppMetadata,
 42    architecture: &'static str,
 43    events_queue: Vec<EventWrapper>,
 44    flush_events_task: Option<Task<()>>,
 45    log_file: Option<NamedTempFile>,
 46    is_staff: Option<bool>,
 47    first_event_date_time: Option<DateTime<Utc>>,
 48    event_coalescer: EventCoalescer,
 49    max_queue_size: usize,
 50}
 51
 52#[cfg(debug_assertions)]
 53const MAX_QUEUE_LEN: usize = 5;
 54
 55#[cfg(not(debug_assertions))]
 56const MAX_QUEUE_LEN: usize = 50;
 57
 58#[cfg(debug_assertions)]
 59const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
 60
 61#[cfg(not(debug_assertions))]
 62const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
 63static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
 64    option_env!("ZED_CLIENT_CHECKSUM_SEED")
 65        .map(|s| s.as_bytes().into())
 66        .or_else(|| {
 67            env::var("ZED_CLIENT_CHECKSUM_SEED")
 68                .ok()
 69                .map(|s| s.as_bytes().into())
 70        })
 71});
 72
 73impl Telemetry {
 74    pub fn new(
 75        clock: Arc<dyn SystemClock>,
 76        client: Arc<HttpClientWithUrl>,
 77        cx: &mut AppContext,
 78    ) -> Arc<Self> {
 79        let release_channel =
 80            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
 81
 82        TelemetrySettings::register(cx);
 83
 84        let state = Arc::new(Mutex::new(TelemetryState {
 85            settings: *TelemetrySettings::get_global(cx),
 86            app_metadata: cx.app_metadata(),
 87            architecture: env::consts::ARCH,
 88            release_channel,
 89            installation_id: None,
 90            metrics_id: None,
 91            session_id: None,
 92            events_queue: Vec::new(),
 93            flush_events_task: None,
 94            log_file: None,
 95            is_staff: None,
 96            first_event_date_time: None,
 97            event_coalescer: EventCoalescer::new(clock.clone()),
 98            max_queue_size: MAX_QUEUE_LEN,
 99        }));
100
101        #[cfg(not(debug_assertions))]
102        cx.background_executor()
103            .spawn({
104                let state = state.clone();
105                async move {
106                    if let Some(tempfile) =
107                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
108                    {
109                        state.lock().log_file = Some(tempfile);
110                    }
111                }
112            })
113            .detach();
114
115        cx.observe_global::<SettingsStore>({
116            let state = state.clone();
117
118            move |cx| {
119                let mut state = state.lock();
120                state.settings = *TelemetrySettings::get_global(cx);
121            }
122        })
123        .detach();
124
125        // TODO: Replace all hardware stuff with nested SystemSpecs json
126        let this = Arc::new(Self {
127            clock,
128            http_client: client,
129            executor: cx.background_executor().clone(),
130            state,
131        });
132
133        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
134        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
135        std::mem::forget(cx.on_app_quit({
136            let this = this.clone();
137            move |_| this.shutdown_telemetry()
138        }));
139
140        this
141    }
142
143    #[cfg(any(test, feature = "test-support"))]
144    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
145        Task::ready(())
146    }
147
148    // Skip calling this function in tests.
149    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
150    #[cfg(not(any(test, feature = "test-support")))]
151    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
152        self.report_app_event("close".to_string());
153        // TODO: close final edit period and make sure it's sent
154        Task::ready(())
155    }
156
157    pub fn log_file_path(&self) -> Option<PathBuf> {
158        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
159    }
160
161    pub fn start(
162        self: &Arc<Self>,
163        installation_id: Option<String>,
164        session_id: String,
165        cx: &mut AppContext,
166    ) {
167        let mut state = self.state.lock();
168        state.installation_id = installation_id.map(|id| id.into());
169        state.session_id = Some(session_id);
170        drop(state);
171
172        let this = self.clone();
173        cx.spawn(|_| async move {
174            // Avoiding calling `System::new_all()`, as there have been crashes related to it
175            let refresh_kind = RefreshKind::new()
176                .with_memory(MemoryRefreshKind::everything()) // For memory usage
177                .with_processes(ProcessRefreshKind::everything()) // For process usage
178                .with_cpu(CpuRefreshKind::everything()); // For core count
179
180            let mut system = System::new_with_specifics(refresh_kind);
181
182            // Avoiding calling `refresh_all()`, just update what we need
183            system.refresh_specifics(refresh_kind);
184
185            // Waiting some amount of time before the first query is important to get a reasonable value
186            // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
187            const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
188
189            loop {
190                smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
191
192                system.refresh_specifics(refresh_kind);
193
194                let current_process = Pid::from_u32(std::process::id());
195                let Some(process) = system.processes().get(&current_process) else {
196                    let process = current_process;
197                    log::error!("Failed to find own process {process:?} in system process table");
198                    // TODO: Fire an error telemetry event
199                    return;
200                };
201
202                this.report_memory_event(process.memory(), process.virtual_memory());
203                this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
204            }
205        })
206        .detach();
207    }
208
209    pub fn set_authenticated_user_info(
210        self: &Arc<Self>,
211        metrics_id: Option<String>,
212        is_staff: bool,
213    ) {
214        let mut state = self.state.lock();
215
216        if !state.settings.metrics {
217            return;
218        }
219
220        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
221        state.metrics_id = metrics_id.clone();
222        state.is_staff = Some(is_staff);
223        drop(state);
224    }
225
226    pub fn report_editor_event(
227        self: &Arc<Self>,
228        file_extension: Option<String>,
229        vim_mode: bool,
230        operation: &'static str,
231        copilot_enabled: bool,
232        copilot_enabled_for_language: bool,
233    ) {
234        let event = Event::Editor(EditorEvent {
235            file_extension,
236            vim_mode,
237            operation: operation.into(),
238            copilot_enabled,
239            copilot_enabled_for_language,
240        });
241
242        self.report_event(event)
243    }
244
245    pub fn report_copilot_event(
246        self: &Arc<Self>,
247        suggestion_id: Option<String>,
248        suggestion_accepted: bool,
249        file_extension: Option<String>,
250    ) {
251        let event = Event::Copilot(CopilotEvent {
252            suggestion_id,
253            suggestion_accepted,
254            file_extension,
255        });
256
257        self.report_event(event)
258    }
259
260    pub fn report_assistant_event(
261        self: &Arc<Self>,
262        conversation_id: Option<String>,
263        kind: AssistantKind,
264        model: &str,
265    ) {
266        let event = Event::Assistant(AssistantEvent {
267            conversation_id,
268            kind,
269            model: model.to_string(),
270        });
271
272        self.report_event(event)
273    }
274
275    pub fn report_call_event(
276        self: &Arc<Self>,
277        operation: &'static str,
278        room_id: Option<u64>,
279        channel_id: Option<ChannelId>,
280    ) {
281        let event = Event::Call(CallEvent {
282            operation: operation.to_string(),
283            room_id,
284            channel_id: channel_id.map(|cid| cid.0),
285        });
286
287        self.report_event(event)
288    }
289
290    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
291        let event = Event::Cpu(CpuEvent {
292            usage_as_percentage,
293            core_count,
294        });
295
296        self.report_event(event)
297    }
298
299    pub fn report_memory_event(
300        self: &Arc<Self>,
301        memory_in_bytes: u64,
302        virtual_memory_in_bytes: u64,
303    ) {
304        let event = Event::Memory(MemoryEvent {
305            memory_in_bytes,
306            virtual_memory_in_bytes,
307        });
308
309        self.report_event(event)
310    }
311
312    pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
313        let event = Event::App(AppEvent { operation });
314
315        self.report_event(event.clone());
316
317        event
318    }
319
320    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
321        let event = Event::Setting(SettingEvent {
322            setting: setting.to_string(),
323            value,
324        });
325
326        self.report_event(event)
327    }
328
329    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
330        let mut state = self.state.lock();
331        let period_data = state.event_coalescer.log_event(environment);
332        drop(state);
333
334        if let Some((start, end, environment)) = period_data {
335            let event = Event::Edit(EditEvent {
336                duration: end.timestamp_millis() - start.timestamp_millis(),
337                environment: environment.to_string(),
338            });
339
340            self.report_event(event);
341        }
342    }
343
344    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
345        let event = Event::Action(ActionEvent {
346            source: source.to_string(),
347            action,
348        });
349
350        self.report_event(event)
351    }
352
353    fn report_event(self: &Arc<Self>, event: Event) {
354        let mut state = self.state.lock();
355
356        if !state.settings.metrics {
357            return;
358        }
359
360        if state.flush_events_task.is_none() {
361            let this = self.clone();
362            let executor = self.executor.clone();
363            state.flush_events_task = Some(self.executor.spawn(async move {
364                executor.timer(FLUSH_INTERVAL).await;
365                this.flush_events();
366            }));
367        }
368
369        let date_time = self.clock.utc_now();
370
371        let milliseconds_since_first_event = match state.first_event_date_time {
372            Some(first_event_date_time) => {
373                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
374            }
375            None => {
376                state.first_event_date_time = Some(date_time);
377                0
378            }
379        };
380
381        let signed_in = state.metrics_id.is_some();
382        state.events_queue.push(EventWrapper {
383            signed_in,
384            milliseconds_since_first_event,
385            event,
386        });
387
388        if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
389            drop(state);
390            self.flush_events();
391        }
392    }
393
394    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
395        self.state.lock().metrics_id.clone()
396    }
397
398    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
399        self.state.lock().installation_id.clone()
400    }
401
402    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
403        self.state.lock().is_staff
404    }
405
406    pub fn flush_events(self: &Arc<Self>) {
407        let mut state = self.state.lock();
408        state.first_event_date_time = None;
409        let mut events = mem::take(&mut state.events_queue);
410        state.flush_events_task.take();
411        drop(state);
412        if events.is_empty() {
413            return;
414        }
415
416        let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
417            return;
418        };
419
420        let this = self.clone();
421        self.executor
422            .spawn(
423                async move {
424                    let mut json_bytes = Vec::new();
425
426                    if let Some(file) = &mut this.state.lock().log_file {
427                        let file = file.as_file_mut();
428                        for event in &mut events {
429                            json_bytes.clear();
430                            serde_json::to_writer(&mut json_bytes, event)?;
431                            file.write_all(&json_bytes)?;
432                            file.write_all(b"\n")?;
433                        }
434                    }
435
436                    {
437                        let state = this.state.lock();
438                        let request_body = EventRequestBody {
439                            installation_id: state.installation_id.as_deref().map(Into::into),
440                            session_id: state.session_id.clone(),
441                            is_staff: state.is_staff,
442                            app_version: state
443                                .app_metadata
444                                .app_version
445                                .unwrap_or_default()
446                                .to_string(),
447                            os_name: state.app_metadata.os_name.to_string(),
448                            os_version: state
449                                .app_metadata
450                                .os_version
451                                .map(|version| version.to_string()),
452                            architecture: state.architecture.to_string(),
453
454                            release_channel: state.release_channel.map(Into::into),
455                            events,
456                        };
457                        json_bytes.clear();
458                        serde_json::to_writer(&mut json_bytes, &request_body)?;
459                    }
460
461                    let mut summer = Sha256::new();
462                    summer.update(checksum_seed);
463                    summer.update(&json_bytes);
464                    summer.update(checksum_seed);
465                    let mut checksum = String::new();
466                    for byte in summer.finalize().as_slice() {
467                        use std::fmt::Write;
468                        write!(&mut checksum, "{:02x}", byte).unwrap();
469                    }
470
471                    let request = http::Request::builder()
472                        .method(Method::POST)
473                        .uri(this.http_client.build_zed_api_url("/telemetry/events"))
474                        .header("Content-Type", "text/plain")
475                        .header("x-zed-checksum", checksum)
476                        .body(json_bytes.into());
477
478                    let response = this.http_client.send(request?).await?;
479                    if response.status() != 200 {
480                        log::error!("Failed to send events: HTTP {:?}", response.status());
481                    }
482                    anyhow::Ok(())
483                }
484                .log_err(),
485            )
486            .detach();
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use chrono::TimeZone;
494    use clock::FakeSystemClock;
495    use gpui::TestAppContext;
496    use util::http::FakeHttpClient;
497
498    #[gpui::test]
499    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
500        init_test(cx);
501        let clock = Arc::new(FakeSystemClock::new(
502            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
503        ));
504        let http = FakeHttpClient::with_200_response();
505        let installation_id = Some("installation_id".to_string());
506        let session_id = "session_id".to_string();
507
508        cx.update(|cx| {
509            let telemetry = Telemetry::new(clock.clone(), http, cx);
510
511            telemetry.state.lock().max_queue_size = 4;
512            telemetry.start(installation_id, session_id, cx);
513
514            assert!(is_empty_state(&telemetry));
515
516            let first_date_time = clock.utc_now();
517            let operation = "test".to_string();
518
519            let event = telemetry.report_app_event(operation.clone());
520            assert_eq!(
521                event,
522                Event::App(AppEvent {
523                    operation: operation.clone(),
524                })
525            );
526            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
527            assert!(telemetry.state.lock().flush_events_task.is_some());
528            assert_eq!(
529                telemetry.state.lock().first_event_date_time,
530                Some(first_date_time)
531            );
532
533            clock.advance(chrono::Duration::milliseconds(100));
534
535            let event = telemetry.report_app_event(operation.clone());
536            assert_eq!(
537                event,
538                Event::App(AppEvent {
539                    operation: operation.clone(),
540                })
541            );
542            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
543            assert!(telemetry.state.lock().flush_events_task.is_some());
544            assert_eq!(
545                telemetry.state.lock().first_event_date_time,
546                Some(first_date_time)
547            );
548
549            clock.advance(chrono::Duration::milliseconds(100));
550
551            let event = telemetry.report_app_event(operation.clone());
552            assert_eq!(
553                event,
554                Event::App(AppEvent {
555                    operation: operation.clone(),
556                })
557            );
558            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
559            assert!(telemetry.state.lock().flush_events_task.is_some());
560            assert_eq!(
561                telemetry.state.lock().first_event_date_time,
562                Some(first_date_time)
563            );
564
565            clock.advance(chrono::Duration::milliseconds(100));
566
567            // Adding a 4th event should cause a flush
568            let event = telemetry.report_app_event(operation.clone());
569            assert_eq!(
570                event,
571                Event::App(AppEvent {
572                    operation: operation.clone(),
573                })
574            );
575
576            assert!(is_empty_state(&telemetry));
577        });
578    }
579
580    #[gpui::test]
581    async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
582        init_test(cx);
583        let clock = Arc::new(FakeSystemClock::new(
584            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
585        ));
586        let http = FakeHttpClient::with_200_response();
587        let installation_id = Some("installation_id".to_string());
588        let session_id = "session_id".to_string();
589
590        cx.update(|cx| {
591            let telemetry = Telemetry::new(clock.clone(), http, cx);
592            telemetry.state.lock().max_queue_size = 4;
593            telemetry.start(installation_id, session_id, cx);
594
595            assert!(is_empty_state(&telemetry));
596
597            let first_date_time = clock.utc_now();
598            let operation = "test".to_string();
599
600            let event = telemetry.report_app_event(operation.clone());
601            assert_eq!(
602                event,
603                Event::App(AppEvent {
604                    operation: operation.clone(),
605                })
606            );
607            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
608            assert!(telemetry.state.lock().flush_events_task.is_some());
609            assert_eq!(
610                telemetry.state.lock().first_event_date_time,
611                Some(first_date_time)
612            );
613
614            let duration = Duration::from_millis(1);
615
616            // Test 1 millisecond before the flush interval limit is met
617            executor.advance_clock(FLUSH_INTERVAL - duration);
618
619            assert!(!is_empty_state(&telemetry));
620
621            // Test the exact moment the flush interval limit is met
622            executor.advance_clock(duration);
623
624            assert!(is_empty_state(&telemetry));
625        });
626    }
627
628    // TODO:
629    // Test settings
630    // Update FakeHTTPClient to keep track of the number of requests and assert on it
631
632    fn init_test(cx: &mut TestAppContext) {
633        cx.update(|cx| {
634            let settings_store = SettingsStore::test(cx);
635            cx.set_global(settings_store);
636        });
637    }
638
639    fn is_empty_state(telemetry: &Telemetry) -> bool {
640        telemetry.state.lock().events_queue.is_empty()
641            && telemetry.state.lock().flush_events_task.is_none()
642            && telemetry.state.lock().first_event_date_time.is_none()
643    }
644}