telemetry.rs

  1mod event_coalescer;
  2
  3use crate::TelemetrySettings;
  4use chrono::{DateTime, Utc};
  5use clock::SystemClock;
  6use futures::Future;
  7use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
  8use once_cell::sync::Lazy;
  9use parking_lot::Mutex;
 10use release_channel::ReleaseChannel;
 11use serde::Serialize;
 12use settings::{Settings, SettingsStore};
 13use sha2::{Digest, Sha256};
 14use std::io::Write;
 15use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 16use sysinfo::{
 17    CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
 18};
 19use tempfile::NamedTempFile;
 20use util::http::{self, HttpClient, Method, ZedHttpClient};
 21#[cfg(not(debug_assertions))]
 22use util::ResultExt;
 23use util::TryFutureExt;
 24
 25use self::event_coalescer::EventCoalescer;
 26
 27pub struct Telemetry {
 28    clock: Arc<dyn SystemClock>,
 29    http_client: Arc<ZedHttpClient>,
 30    executor: BackgroundExecutor,
 31    state: Arc<Mutex<TelemetryState>>,
 32}
 33
 34struct TelemetryState {
 35    settings: TelemetrySettings,
 36    metrics_id: Option<Arc<str>>,      // Per logged-in user
 37    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 38    session_id: Option<Arc<str>>,      // Per app launch
 39    release_channel: Option<&'static str>,
 40    app_metadata: AppMetadata,
 41    architecture: &'static str,
 42    events_queue: Vec<EventWrapper>,
 43    flush_events_task: Option<Task<()>>,
 44    log_file: Option<NamedTempFile>,
 45    is_staff: Option<bool>,
 46    first_event_date_time: Option<DateTime<Utc>>,
 47    event_coalescer: EventCoalescer,
 48    max_queue_size: usize,
 49}
 50
 51#[derive(Serialize, Debug)]
 52struct EventRequestBody {
 53    installation_id: Option<Arc<str>>,
 54    session_id: Option<Arc<str>>,
 55    is_staff: Option<bool>,
 56    app_version: Option<String>,
 57    os_name: &'static str,
 58    os_version: Option<String>,
 59    architecture: &'static str,
 60    release_channel: Option<&'static str>,
 61    events: Vec<EventWrapper>,
 62}
 63
 64#[derive(Serialize, Debug)]
 65struct EventWrapper {
 66    signed_in: bool,
 67    #[serde(flatten)]
 68    event: Event,
 69}
 70
 71#[derive(Clone, Debug, PartialEq, Serialize)]
 72#[serde(rename_all = "snake_case")]
 73pub enum AssistantKind {
 74    Panel,
 75    Inline,
 76}
 77
 78#[derive(Clone, Debug, PartialEq, Serialize)]
 79#[serde(tag = "type")]
 80pub enum Event {
 81    Editor {
 82        operation: &'static str,
 83        file_extension: Option<String>,
 84        vim_mode: bool,
 85        copilot_enabled: bool,
 86        copilot_enabled_for_language: bool,
 87        milliseconds_since_first_event: i64,
 88    },
 89    Copilot {
 90        suggestion_id: Option<String>,
 91        suggestion_accepted: bool,
 92        file_extension: Option<String>,
 93        milliseconds_since_first_event: i64,
 94    },
 95    Call {
 96        operation: &'static str,
 97        room_id: Option<u64>,
 98        channel_id: Option<u64>,
 99        milliseconds_since_first_event: i64,
100    },
101    Assistant {
102        conversation_id: Option<String>,
103        kind: AssistantKind,
104        model: &'static str,
105        milliseconds_since_first_event: i64,
106    },
107    Cpu {
108        usage_as_percentage: f32,
109        core_count: u32,
110        milliseconds_since_first_event: i64,
111    },
112    Memory {
113        memory_in_bytes: u64,
114        virtual_memory_in_bytes: u64,
115        milliseconds_since_first_event: i64,
116    },
117    App {
118        operation: String,
119        milliseconds_since_first_event: i64,
120    },
121    Setting {
122        setting: &'static str,
123        value: String,
124        milliseconds_since_first_event: i64,
125    },
126    Edit {
127        duration: i64,
128        environment: &'static str,
129        milliseconds_since_first_event: i64,
130    },
131    Action {
132        source: &'static str,
133        action: String,
134        milliseconds_since_first_event: i64,
135    },
136}
137
138#[cfg(debug_assertions)]
139const MAX_QUEUE_LEN: usize = 5;
140
141#[cfg(not(debug_assertions))]
142const MAX_QUEUE_LEN: usize = 50;
143
144#[cfg(debug_assertions)]
145const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
146
147#[cfg(not(debug_assertions))]
148const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
149
150static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
151    option_env!("ZED_CLIENT_CHECKSUM_SEED")
152        .map(|s| s.as_bytes().into())
153        .or_else(|| {
154            env::var("ZED_CLIENT_CHECKSUM_SEED")
155                .ok()
156                .map(|s| s.as_bytes().into())
157        })
158});
159
160impl Telemetry {
161    pub fn new(
162        clock: Arc<dyn SystemClock>,
163        client: Arc<ZedHttpClient>,
164        cx: &mut AppContext,
165    ) -> Arc<Self> {
166        let release_channel =
167            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
168
169        TelemetrySettings::register(cx);
170
171        let state = Arc::new(Mutex::new(TelemetryState {
172            settings: TelemetrySettings::get_global(cx).clone(),
173            app_metadata: cx.app_metadata(),
174            architecture: env::consts::ARCH,
175            release_channel,
176            installation_id: None,
177            metrics_id: None,
178            session_id: None,
179            events_queue: Vec::new(),
180            flush_events_task: None,
181            log_file: None,
182            is_staff: None,
183            first_event_date_time: None,
184            event_coalescer: EventCoalescer::new(),
185            max_queue_size: MAX_QUEUE_LEN,
186        }));
187
188        #[cfg(not(debug_assertions))]
189        cx.background_executor()
190            .spawn({
191                let state = state.clone();
192                async move {
193                    if let Some(tempfile) =
194                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
195                    {
196                        state.lock().log_file = Some(tempfile);
197                    }
198                }
199            })
200            .detach();
201
202        cx.observe_global::<SettingsStore>({
203            let state = state.clone();
204
205            move |cx| {
206                let mut state = state.lock();
207                state.settings = TelemetrySettings::get_global(cx).clone();
208            }
209        })
210        .detach();
211
212        // TODO: Replace all hardware stuff with nested SystemSpecs json
213        let this = Arc::new(Self {
214            clock,
215            http_client: client,
216            executor: cx.background_executor().clone(),
217            state,
218        });
219
220        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
221        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
222        std::mem::forget(cx.on_app_quit({
223            let this = this.clone();
224            move |_| this.shutdown_telemetry()
225        }));
226
227        this
228    }
229
230    #[cfg(any(test, feature = "test-support"))]
231    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
232        Task::ready(())
233    }
234
235    // Skip calling this function in tests.
236    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
237    #[cfg(not(any(test, feature = "test-support")))]
238    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
239        self.report_app_event("close".to_string());
240        // TODO: close final edit period and make sure it's sent
241        Task::ready(())
242    }
243
244    pub fn log_file_path(&self) -> Option<PathBuf> {
245        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
246    }
247
248    pub fn start(
249        self: &Arc<Self>,
250        installation_id: Option<String>,
251        session_id: String,
252        cx: &mut AppContext,
253    ) {
254        let mut state = self.state.lock();
255        state.installation_id = installation_id.map(|id| id.into());
256        state.session_id = Some(session_id.into());
257        drop(state);
258
259        let this = self.clone();
260        cx.spawn(|_| async move {
261            // Avoiding calling `System::new_all()`, as there have been crashes related to it
262            let refresh_kind = RefreshKind::new()
263                .with_memory() // For memory usage
264                .with_processes(ProcessRefreshKind::everything()) // For process usage
265                .with_cpu(CpuRefreshKind::everything()); // For core count
266
267            let mut system = System::new_with_specifics(refresh_kind);
268
269            // Avoiding calling `refresh_all()`, just update what we need
270            system.refresh_specifics(refresh_kind);
271
272            // Waiting some amount of time before the first query is important to get a reasonable value
273            // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
274            const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
275
276            loop {
277                smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
278
279                system.refresh_specifics(refresh_kind);
280
281                let current_process = Pid::from_u32(std::process::id());
282                let Some(process) = system.processes().get(&current_process) else {
283                    let process = current_process;
284                    log::error!("Failed to find own process {process:?} in system process table");
285                    // TODO: Fire an error telemetry event
286                    return;
287                };
288
289                this.report_memory_event(process.memory(), process.virtual_memory());
290                this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
291            }
292        })
293        .detach();
294    }
295
296    pub fn set_authenticated_user_info(
297        self: &Arc<Self>,
298        metrics_id: Option<String>,
299        is_staff: bool,
300    ) {
301        let mut state = self.state.lock();
302
303        if !state.settings.metrics {
304            return;
305        }
306
307        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
308        state.metrics_id = metrics_id.clone();
309        state.is_staff = Some(is_staff);
310        drop(state);
311    }
312
313    pub fn report_editor_event(
314        self: &Arc<Self>,
315        file_extension: Option<String>,
316        vim_mode: bool,
317        operation: &'static str,
318        copilot_enabled: bool,
319        copilot_enabled_for_language: bool,
320    ) {
321        let event = Event::Editor {
322            file_extension,
323            vim_mode,
324            operation,
325            copilot_enabled,
326            copilot_enabled_for_language,
327            milliseconds_since_first_event: self
328                .milliseconds_since_first_event(self.clock.utc_now()),
329        };
330
331        self.report_event(event)
332    }
333
334    pub fn report_copilot_event(
335        self: &Arc<Self>,
336        suggestion_id: Option<String>,
337        suggestion_accepted: bool,
338        file_extension: Option<String>,
339    ) {
340        let event = Event::Copilot {
341            suggestion_id,
342            suggestion_accepted,
343            file_extension,
344            milliseconds_since_first_event: self
345                .milliseconds_since_first_event(self.clock.utc_now()),
346        };
347
348        self.report_event(event)
349    }
350
351    pub fn report_assistant_event(
352        self: &Arc<Self>,
353        conversation_id: Option<String>,
354        kind: AssistantKind,
355        model: &'static str,
356    ) {
357        let event = Event::Assistant {
358            conversation_id,
359            kind,
360            model,
361            milliseconds_since_first_event: self
362                .milliseconds_since_first_event(self.clock.utc_now()),
363        };
364
365        self.report_event(event)
366    }
367
368    pub fn report_call_event(
369        self: &Arc<Self>,
370        operation: &'static str,
371        room_id: Option<u64>,
372        channel_id: Option<u64>,
373    ) {
374        let event = Event::Call {
375            operation,
376            room_id,
377            channel_id,
378            milliseconds_since_first_event: self
379                .milliseconds_since_first_event(self.clock.utc_now()),
380        };
381
382        self.report_event(event)
383    }
384
385    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
386        let event = Event::Cpu {
387            usage_as_percentage,
388            core_count,
389            milliseconds_since_first_event: self
390                .milliseconds_since_first_event(self.clock.utc_now()),
391        };
392
393        self.report_event(event)
394    }
395
396    pub fn report_memory_event(
397        self: &Arc<Self>,
398        memory_in_bytes: u64,
399        virtual_memory_in_bytes: u64,
400    ) {
401        let event = Event::Memory {
402            memory_in_bytes,
403            virtual_memory_in_bytes,
404            milliseconds_since_first_event: self
405                .milliseconds_since_first_event(self.clock.utc_now()),
406        };
407
408        self.report_event(event)
409    }
410
411    pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
412        let event = Event::App {
413            operation,
414            milliseconds_since_first_event: self
415                .milliseconds_since_first_event(self.clock.utc_now()),
416        };
417
418        self.report_event(event.clone());
419
420        event
421    }
422
423    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
424        let event = Event::Setting {
425            setting,
426            value,
427            milliseconds_since_first_event: self
428                .milliseconds_since_first_event(self.clock.utc_now()),
429        };
430
431        self.report_event(event)
432    }
433
434    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
435        let mut state = self.state.lock();
436        let period_data = state.event_coalescer.log_event(environment);
437        drop(state);
438
439        if let Some((start, end, environment)) = period_data {
440            let event = Event::Edit {
441                duration: end.timestamp_millis() - start.timestamp_millis(),
442                environment,
443                milliseconds_since_first_event: self
444                    .milliseconds_since_first_event(self.clock.utc_now()),
445            };
446
447            self.report_event(event);
448        }
449    }
450
451    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
452        let event = Event::Action {
453            source,
454            action,
455            milliseconds_since_first_event: self
456                .milliseconds_since_first_event(self.clock.utc_now()),
457        };
458
459        self.report_event(event)
460    }
461
462    fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
463        let mut state = self.state.lock();
464
465        match state.first_event_date_time {
466            Some(first_event_date_time) => {
467                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
468            }
469            None => {
470                state.first_event_date_time = Some(date_time);
471                0
472            }
473        }
474    }
475
476    fn report_event(self: &Arc<Self>, event: Event) {
477        let mut state = self.state.lock();
478
479        if !state.settings.metrics {
480            return;
481        }
482
483        if state.flush_events_task.is_none() {
484            let this = self.clone();
485            let executor = self.executor.clone();
486            state.flush_events_task = Some(self.executor.spawn(async move {
487                executor.timer(FLUSH_INTERVAL).await;
488                this.flush_events();
489            }));
490        }
491
492        let signed_in = state.metrics_id.is_some();
493        state.events_queue.push(EventWrapper { signed_in, event });
494
495        if state.installation_id.is_some() {
496            if state.events_queue.len() >= state.max_queue_size {
497                drop(state);
498                self.flush_events();
499            }
500        }
501    }
502
503    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
504        self.state.lock().metrics_id.clone()
505    }
506
507    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
508        self.state.lock().installation_id.clone()
509    }
510
511    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
512        self.state.lock().is_staff
513    }
514
515    pub fn flush_events(self: &Arc<Self>) {
516        let mut state = self.state.lock();
517        state.first_event_date_time = None;
518        let mut events = mem::take(&mut state.events_queue);
519        state.flush_events_task.take();
520        drop(state);
521        if events.is_empty() {
522            return;
523        }
524
525        let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
526            return;
527        };
528
529        let this = self.clone();
530        self.executor
531            .spawn(
532                async move {
533                    let mut json_bytes = Vec::new();
534
535                    if let Some(file) = &mut this.state.lock().log_file {
536                        let file = file.as_file_mut();
537                        for event in &mut events {
538                            json_bytes.clear();
539                            serde_json::to_writer(&mut json_bytes, event)?;
540                            file.write_all(&json_bytes)?;
541                            file.write(b"\n")?;
542                        }
543                    }
544
545                    {
546                        let state = this.state.lock();
547                        let request_body = EventRequestBody {
548                            installation_id: state.installation_id.clone(),
549                            session_id: state.session_id.clone(),
550                            is_staff: state.is_staff.clone(),
551                            app_version: state
552                                .app_metadata
553                                .app_version
554                                .map(|version| version.to_string()),
555                            os_name: state.app_metadata.os_name,
556                            os_version: state
557                                .app_metadata
558                                .os_version
559                                .map(|version| version.to_string()),
560                            architecture: state.architecture,
561
562                            release_channel: state.release_channel,
563                            events,
564                        };
565                        json_bytes.clear();
566                        serde_json::to_writer(&mut json_bytes, &request_body)?;
567                    }
568
569                    let mut summer = Sha256::new();
570                    summer.update(checksum_seed);
571                    summer.update(&json_bytes);
572                    summer.update(checksum_seed);
573                    let mut checksum = String::new();
574                    for byte in summer.finalize().as_slice() {
575                        use std::fmt::Write;
576                        write!(&mut checksum, "{:02x}", byte).unwrap();
577                    }
578
579                    let request = http::Request::builder()
580                        .method(Method::POST)
581                        .uri(&this.http_client.zed_url("/api/events"))
582                        .header("Content-Type", "text/plain")
583                        .header("x-zed-checksum", checksum)
584                        .body(json_bytes.into());
585
586                    let response = this.http_client.send(request?).await?;
587                    if response.status() != 200 {
588                        log::error!("Failed to send events: HTTP {:?}", response.status());
589                    }
590                    anyhow::Ok(())
591                }
592                .log_err(),
593            )
594            .detach();
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use chrono::TimeZone;
602    use clock::FakeSystemClock;
603    use gpui::TestAppContext;
604    use util::http::FakeHttpClient;
605
606    #[gpui::test]
607    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
608        init_test(cx);
609        let clock = Arc::new(FakeSystemClock::new(
610            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
611        ));
612        let http = FakeHttpClient::with_200_response();
613        let installation_id = Some("installation_id".to_string());
614        let session_id = "session_id".to_string();
615
616        cx.update(|cx| {
617            let telemetry = Telemetry::new(clock.clone(), http, cx);
618
619            telemetry.state.lock().max_queue_size = 4;
620            telemetry.start(installation_id, session_id, cx);
621
622            assert!(is_empty_state(&telemetry));
623
624            let first_date_time = clock.utc_now();
625            let operation = "test".to_string();
626
627            let event = telemetry.report_app_event(operation.clone());
628            assert_eq!(
629                event,
630                Event::App {
631                    operation: operation.clone(),
632                    milliseconds_since_first_event: 0
633                }
634            );
635            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
636            assert!(telemetry.state.lock().flush_events_task.is_some());
637            assert_eq!(
638                telemetry.state.lock().first_event_date_time,
639                Some(first_date_time)
640            );
641
642            clock.advance(chrono::Duration::milliseconds(100));
643
644            let event = telemetry.report_app_event(operation.clone());
645            assert_eq!(
646                event,
647                Event::App {
648                    operation: operation.clone(),
649                    milliseconds_since_first_event: 100
650                }
651            );
652            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
653            assert!(telemetry.state.lock().flush_events_task.is_some());
654            assert_eq!(
655                telemetry.state.lock().first_event_date_time,
656                Some(first_date_time)
657            );
658
659            clock.advance(chrono::Duration::milliseconds(100));
660
661            let event = telemetry.report_app_event(operation.clone());
662            assert_eq!(
663                event,
664                Event::App {
665                    operation: operation.clone(),
666                    milliseconds_since_first_event: 200
667                }
668            );
669            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
670            assert!(telemetry.state.lock().flush_events_task.is_some());
671            assert_eq!(
672                telemetry.state.lock().first_event_date_time,
673                Some(first_date_time)
674            );
675
676            clock.advance(chrono::Duration::milliseconds(100));
677
678            // Adding a 4th event should cause a flush
679            let event = telemetry.report_app_event(operation.clone());
680            assert_eq!(
681                event,
682                Event::App {
683                    operation: operation.clone(),
684                    milliseconds_since_first_event: 300
685                }
686            );
687
688            assert!(is_empty_state(&telemetry));
689        });
690    }
691
692    #[gpui::test]
693    async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
694        init_test(cx);
695        let clock = Arc::new(FakeSystemClock::new(
696            Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap(),
697        ));
698        let http = FakeHttpClient::with_200_response();
699        let installation_id = Some("installation_id".to_string());
700        let session_id = "session_id".to_string();
701
702        cx.update(|cx| {
703            let telemetry = Telemetry::new(clock.clone(), http, cx);
704            telemetry.state.lock().max_queue_size = 4;
705            telemetry.start(installation_id, session_id, cx);
706
707            assert!(is_empty_state(&telemetry));
708
709            let first_date_time = clock.utc_now();
710            let operation = "test".to_string();
711
712            let event = telemetry.report_app_event(operation.clone());
713            assert_eq!(
714                event,
715                Event::App {
716                    operation: operation.clone(),
717                    milliseconds_since_first_event: 0
718                }
719            );
720            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
721            assert!(telemetry.state.lock().flush_events_task.is_some());
722            assert_eq!(
723                telemetry.state.lock().first_event_date_time,
724                Some(first_date_time)
725            );
726
727            let duration = Duration::from_millis(1);
728
729            // Test 1 millisecond before the flush interval limit is met
730            executor.advance_clock(FLUSH_INTERVAL - duration);
731
732            assert!(!is_empty_state(&telemetry));
733
734            // Test the exact moment the flush interval limit is met
735            executor.advance_clock(duration);
736
737            assert!(is_empty_state(&telemetry));
738        });
739    }
740
741    // TODO:
742    // Test settings
743    // Update FakeHTTPClient to keep track of the number of requests and assert on it
744
745    fn init_test(cx: &mut TestAppContext) {
746        cx.update(|cx| {
747            let settings_store = SettingsStore::test(cx);
748            cx.set_global(settings_store);
749        });
750    }
751
752    fn is_empty_state(telemetry: &Telemetry) -> bool {
753        telemetry.state.lock().events_queue.is_empty()
754            && telemetry.state.lock().flush_events_task.is_none()
755            && telemetry.state.lock().first_event_date_time.is_none()
756    }
757}