telemetry.rs

  1mod event_coalescer;
  2
  3use crate::{TelemetrySettings, ZED_SERVER_URL};
  4use chrono::{DateTime, Utc};
  5use futures::Future;
  6use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
  7use lazy_static::lazy_static;
  8use parking_lot::Mutex;
  9use serde::Serialize;
 10use settings::{Settings, SettingsStore};
 11use std::{env, io::Write, mem, path::PathBuf, sync::Arc, time::Duration};
 12use sysinfo::{
 13    CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
 14};
 15use tempfile::NamedTempFile;
 16use util::http::HttpClient;
 17#[cfg(not(debug_assertions))]
 18use util::ResultExt;
 19use util::{channel::ReleaseChannel, TryFutureExt};
 20
 21use self::event_coalescer::EventCoalescer;
 22
 23pub struct Telemetry {
 24    http_client: Arc<dyn HttpClient>,
 25    executor: BackgroundExecutor,
 26    state: Arc<Mutex<TelemetryState>>,
 27}
 28
 29struct TelemetryState {
 30    settings: TelemetrySettings,
 31    metrics_id: Option<Arc<str>>,      // Per logged-in user
 32    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 33    session_id: Option<Arc<str>>,      // Per app launch
 34    release_channel: Option<&'static str>,
 35    app_metadata: AppMetadata,
 36    architecture: &'static str,
 37    events_queue: Vec<EventWrapper>,
 38    flush_events_task: Option<Task<()>>,
 39    log_file: Option<NamedTempFile>,
 40    is_staff: Option<bool>,
 41    first_event_date_time: Option<DateTime<Utc>>,
 42    event_coalescer: EventCoalescer,
 43    max_queue_size: usize,
 44}
 45
 46const EVENTS_URL_PATH: &'static str = "/api/events";
 47
 48lazy_static! {
 49    static ref EVENTS_URL: String = format!("{}{}", *ZED_SERVER_URL, EVENTS_URL_PATH);
 50}
 51
 52#[derive(Serialize, Debug)]
 53struct EventRequestBody {
 54    installation_id: Option<Arc<str>>,
 55    session_id: Option<Arc<str>>,
 56    is_staff: Option<bool>,
 57    app_version: Option<String>,
 58    os_name: &'static str,
 59    os_version: Option<String>,
 60    architecture: &'static str,
 61    release_channel: Option<&'static str>,
 62    events: Vec<EventWrapper>,
 63}
 64
 65#[derive(Serialize, Debug)]
 66struct EventWrapper {
 67    signed_in: bool,
 68    #[serde(flatten)]
 69    event: Event,
 70}
 71
 72#[derive(Clone, Debug, PartialEq, Serialize)]
 73#[serde(rename_all = "snake_case")]
 74pub enum AssistantKind {
 75    Panel,
 76    Inline,
 77}
 78
 79#[derive(Clone, Debug, PartialEq, Serialize)]
 80#[serde(tag = "type")]
 81pub enum Event {
 82    Editor {
 83        operation: &'static str,
 84        file_extension: Option<String>,
 85        vim_mode: bool,
 86        copilot_enabled: bool,
 87        copilot_enabled_for_language: bool,
 88        milliseconds_since_first_event: i64,
 89    },
 90    Copilot {
 91        suggestion_id: Option<String>,
 92        suggestion_accepted: bool,
 93        file_extension: Option<String>,
 94        milliseconds_since_first_event: i64,
 95    },
 96    Call {
 97        operation: &'static str,
 98        room_id: Option<u64>,
 99        channel_id: Option<u64>,
100        milliseconds_since_first_event: i64,
101    },
102    Assistant {
103        conversation_id: Option<String>,
104        kind: AssistantKind,
105        model: &'static str,
106        milliseconds_since_first_event: i64,
107    },
108    Cpu {
109        usage_as_percentage: f32,
110        core_count: u32,
111        milliseconds_since_first_event: i64,
112    },
113    Memory {
114        memory_in_bytes: u64,
115        virtual_memory_in_bytes: u64,
116        milliseconds_since_first_event: i64,
117    },
118    App {
119        operation: String,
120        milliseconds_since_first_event: i64,
121    },
122    Setting {
123        setting: &'static str,
124        value: String,
125        milliseconds_since_first_event: i64,
126    },
127    Edit {
128        duration: i64,
129        environment: &'static str,
130        milliseconds_since_first_event: i64,
131    },
132    Action {
133        source: &'static str,
134        action: String,
135        milliseconds_since_first_event: i64,
136    },
137}
138
139#[cfg(debug_assertions)]
140const MAX_QUEUE_LEN: usize = 5;
141
142#[cfg(not(debug_assertions))]
143const MAX_QUEUE_LEN: usize = 50;
144
145#[cfg(debug_assertions)]
146const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
147
148#[cfg(not(debug_assertions))]
149const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
150
151impl Telemetry {
152    pub fn new(client: Arc<dyn HttpClient>, cx: &mut AppContext) -> Arc<Self> {
153        let release_channel = cx
154            .try_global::<ReleaseChannel>()
155            .map(|release_channel| release_channel.display_name());
156
157        TelemetrySettings::register(cx);
158
159        let state = Arc::new(Mutex::new(TelemetryState {
160            settings: TelemetrySettings::get_global(cx).clone(),
161            app_metadata: cx.app_metadata(),
162            architecture: env::consts::ARCH,
163            release_channel,
164            installation_id: None,
165            metrics_id: None,
166            session_id: None,
167            events_queue: Vec::new(),
168            flush_events_task: None,
169            log_file: None,
170            is_staff: None,
171            first_event_date_time: None,
172            event_coalescer: EventCoalescer::new(),
173            max_queue_size: MAX_QUEUE_LEN,
174        }));
175
176        #[cfg(not(debug_assertions))]
177        cx.background_executor()
178            .spawn({
179                let state = state.clone();
180                async move {
181                    if let Some(tempfile) =
182                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
183                    {
184                        state.lock().log_file = Some(tempfile);
185                    }
186                }
187            })
188            .detach();
189
190        cx.observe_global::<SettingsStore>({
191            let state = state.clone();
192
193            move |cx| {
194                let mut state = state.lock();
195                state.settings = TelemetrySettings::get_global(cx).clone();
196            }
197        })
198        .detach();
199
200        // TODO: Replace all hardware stuff with nested SystemSpecs json
201        let this = Arc::new(Self {
202            http_client: client,
203            executor: cx.background_executor().clone(),
204            state,
205        });
206
207        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
208        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
209        std::mem::forget(cx.on_app_quit({
210            let this = this.clone();
211            move |_| this.shutdown_telemetry()
212        }));
213
214        this
215    }
216
217    #[cfg(any(test, feature = "test-support"))]
218    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
219        Task::ready(())
220    }
221
222    // Skip calling this function in tests.
223    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
224    #[cfg(not(any(test, feature = "test-support")))]
225    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
226        self.report_app_event("close".to_string());
227        // TODO: close final edit period and make sure it's sent
228        Task::ready(())
229    }
230
231    pub fn log_file_path(&self) -> Option<PathBuf> {
232        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
233    }
234
235    pub fn start(
236        self: &Arc<Self>,
237        installation_id: Option<String>,
238        session_id: String,
239        cx: &mut AppContext,
240    ) {
241        let mut state = self.state.lock();
242        state.installation_id = installation_id.map(|id| id.into());
243        state.session_id = Some(session_id.into());
244        drop(state);
245
246        let this = self.clone();
247        cx.spawn(|_| async move {
248            // Avoiding calling `System::new_all()`, as there have been crashes related to it
249            let refresh_kind = RefreshKind::new()
250                .with_memory() // For memory usage
251                .with_processes(ProcessRefreshKind::everything()) // For process usage
252                .with_cpu(CpuRefreshKind::everything()); // For core count
253
254            let mut system = System::new_with_specifics(refresh_kind);
255
256            // Avoiding calling `refresh_all()`, just update what we need
257            system.refresh_specifics(refresh_kind);
258
259            // Waiting some amount of time before the first query is important to get a reasonable value
260            // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
261            const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
262
263            loop {
264                smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
265
266                system.refresh_specifics(refresh_kind);
267
268                let current_process = Pid::from_u32(std::process::id());
269                let Some(process) = system.processes().get(&current_process) else {
270                    let process = current_process;
271                    log::error!("Failed to find own process {process:?} in system process table");
272                    // TODO: Fire an error telemetry event
273                    return;
274                };
275
276                this.report_memory_event(process.memory(), process.virtual_memory());
277                this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
278            }
279        })
280        .detach();
281    }
282
283    pub fn set_authenticated_user_info(
284        self: &Arc<Self>,
285        metrics_id: Option<String>,
286        is_staff: bool,
287    ) {
288        let mut state = self.state.lock();
289
290        if !state.settings.metrics {
291            return;
292        }
293
294        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
295        state.metrics_id = metrics_id.clone();
296        state.is_staff = Some(is_staff);
297        drop(state);
298    }
299
300    pub fn report_editor_event(
301        self: &Arc<Self>,
302        file_extension: Option<String>,
303        vim_mode: bool,
304        operation: &'static str,
305        copilot_enabled: bool,
306        copilot_enabled_for_language: bool,
307    ) {
308        let event = Event::Editor {
309            file_extension,
310            vim_mode,
311            operation,
312            copilot_enabled,
313            copilot_enabled_for_language,
314            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
315        };
316
317        self.report_event(event)
318    }
319
320    pub fn report_copilot_event(
321        self: &Arc<Self>,
322        suggestion_id: Option<String>,
323        suggestion_accepted: bool,
324        file_extension: Option<String>,
325    ) {
326        let event = Event::Copilot {
327            suggestion_id,
328            suggestion_accepted,
329            file_extension,
330            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
331        };
332
333        self.report_event(event)
334    }
335
336    pub fn report_assistant_event(
337        self: &Arc<Self>,
338        conversation_id: Option<String>,
339        kind: AssistantKind,
340        model: &'static str,
341    ) {
342        let event = Event::Assistant {
343            conversation_id,
344            kind,
345            model,
346            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
347        };
348
349        self.report_event(event)
350    }
351
352    pub fn report_call_event(
353        self: &Arc<Self>,
354        operation: &'static str,
355        room_id: Option<u64>,
356        channel_id: Option<u64>,
357    ) {
358        let event = Event::Call {
359            operation,
360            room_id,
361            channel_id,
362            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
363        };
364
365        self.report_event(event)
366    }
367
368    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
369        let event = Event::Cpu {
370            usage_as_percentage,
371            core_count,
372            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
373        };
374
375        self.report_event(event)
376    }
377
378    pub fn report_memory_event(
379        self: &Arc<Self>,
380        memory_in_bytes: u64,
381        virtual_memory_in_bytes: u64,
382    ) {
383        let event = Event::Memory {
384            memory_in_bytes,
385            virtual_memory_in_bytes,
386            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
387        };
388
389        self.report_event(event)
390    }
391
392    pub fn report_app_event(self: &Arc<Self>, operation: String) {
393        self.report_app_event_with_date_time(operation, Utc::now());
394    }
395
396    fn report_app_event_with_date_time(
397        self: &Arc<Self>,
398        operation: String,
399        date_time: DateTime<Utc>,
400    ) -> Event {
401        let event = Event::App {
402            operation,
403            milliseconds_since_first_event: self.milliseconds_since_first_event(date_time),
404        };
405
406        self.report_event(event.clone());
407
408        event
409    }
410
411    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
412        let event = Event::Setting {
413            setting,
414            value,
415            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
416        };
417
418        self.report_event(event)
419    }
420
421    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
422        let mut state = self.state.lock();
423        let period_data = state.event_coalescer.log_event(environment);
424        drop(state);
425
426        if let Some((start, end, environment)) = period_data {
427            let event = Event::Edit {
428                duration: end.timestamp_millis() - start.timestamp_millis(),
429                environment,
430                milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
431            };
432
433            self.report_event(event);
434        }
435    }
436
437    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
438        let event = Event::Action {
439            source,
440            action,
441            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
442        };
443
444        self.report_event(event)
445    }
446
447    fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
448        let mut state = self.state.lock();
449
450        match state.first_event_date_time {
451            Some(first_event_date_time) => {
452                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
453            }
454            None => {
455                state.first_event_date_time = Some(date_time);
456                0
457            }
458        }
459    }
460
461    fn report_event(self: &Arc<Self>, event: Event) {
462        let mut state = self.state.lock();
463
464        if !state.settings.metrics {
465            return;
466        }
467
468        if state.flush_events_task.is_none() {
469            let this = self.clone();
470            let executor = self.executor.clone();
471            state.flush_events_task = Some(self.executor.spawn(async move {
472                executor.timer(FLUSH_INTERVAL).await;
473                this.flush_events();
474            }));
475        }
476
477        let signed_in = state.metrics_id.is_some();
478        state.events_queue.push(EventWrapper { signed_in, event });
479
480        if state.installation_id.is_some() {
481            if state.events_queue.len() >= state.max_queue_size {
482                drop(state);
483                self.flush_events();
484            }
485        }
486    }
487
488    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
489        self.state.lock().metrics_id.clone()
490    }
491
492    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
493        self.state.lock().installation_id.clone()
494    }
495
496    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
497        self.state.lock().is_staff
498    }
499
500    pub fn flush_events(self: &Arc<Self>) {
501        let mut state = self.state.lock();
502        state.first_event_date_time = None;
503        let mut events = mem::take(&mut state.events_queue);
504        state.flush_events_task.take();
505        drop(state);
506        if events.is_empty() {
507            return;
508        }
509
510        let this = self.clone();
511        self.executor
512            .spawn(
513                async move {
514                    let mut json_bytes = Vec::new();
515
516                    if let Some(file) = &mut this.state.lock().log_file {
517                        let file = file.as_file_mut();
518                        for event in &mut events {
519                            json_bytes.clear();
520                            serde_json::to_writer(&mut json_bytes, event)?;
521                            file.write_all(&json_bytes)?;
522                            file.write(b"\n")?;
523                        }
524                    }
525
526                    {
527                        let state = this.state.lock();
528                        let request_body = EventRequestBody {
529                            installation_id: state.installation_id.clone(),
530                            session_id: state.session_id.clone(),
531                            is_staff: state.is_staff.clone(),
532                            app_version: state
533                                .app_metadata
534                                .app_version
535                                .map(|version| version.to_string()),
536                            os_name: state.app_metadata.os_name,
537                            os_version: state
538                                .app_metadata
539                                .os_version
540                                .map(|version| version.to_string()),
541                            architecture: state.architecture,
542
543                            release_channel: state.release_channel,
544                            events,
545                        };
546                        json_bytes.clear();
547                        serde_json::to_writer(&mut json_bytes, &request_body)?;
548                    }
549
550                    this.http_client
551                        .post_json(EVENTS_URL.as_str(), json_bytes.into())
552                        .await?;
553                    anyhow::Ok(())
554                }
555                .log_err(),
556            )
557            .detach();
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use super::*;
564    use chrono::TimeZone;
565    use gpui::TestAppContext;
566    use util::http::FakeHttpClient;
567
568    #[gpui::test]
569    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
570        init_test(cx);
571        let http = FakeHttpClient::with_200_response();
572        let installation_id = Some("installation_id".to_string());
573        let session_id = "session_id".to_string();
574
575        cx.update(|cx| {
576            let telemetry = Telemetry::new(http, cx);
577
578            telemetry.state.lock().max_queue_size = 4;
579            telemetry.start(installation_id, session_id, cx);
580
581            assert!(is_empty_state(&telemetry));
582
583            let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
584            let operation = "test".to_string();
585
586            let event =
587                telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
588            assert_eq!(
589                event,
590                Event::App {
591                    operation: operation.clone(),
592                    milliseconds_since_first_event: 0
593                }
594            );
595            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
596            assert!(telemetry.state.lock().flush_events_task.is_some());
597            assert_eq!(
598                telemetry.state.lock().first_event_date_time,
599                Some(first_date_time)
600            );
601
602            let mut date_time = first_date_time + chrono::Duration::milliseconds(100);
603
604            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
605            assert_eq!(
606                event,
607                Event::App {
608                    operation: operation.clone(),
609                    milliseconds_since_first_event: 100
610                }
611            );
612            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
613            assert!(telemetry.state.lock().flush_events_task.is_some());
614            assert_eq!(
615                telemetry.state.lock().first_event_date_time,
616                Some(first_date_time)
617            );
618
619            date_time += chrono::Duration::milliseconds(100);
620
621            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
622            assert_eq!(
623                event,
624                Event::App {
625                    operation: operation.clone(),
626                    milliseconds_since_first_event: 200
627                }
628            );
629            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
630            assert!(telemetry.state.lock().flush_events_task.is_some());
631            assert_eq!(
632                telemetry.state.lock().first_event_date_time,
633                Some(first_date_time)
634            );
635
636            date_time += chrono::Duration::milliseconds(100);
637
638            // Adding a 4th event should cause a flush
639            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
640            assert_eq!(
641                event,
642                Event::App {
643                    operation: operation.clone(),
644                    milliseconds_since_first_event: 300
645                }
646            );
647
648            assert!(is_empty_state(&telemetry));
649        });
650    }
651
652    #[gpui::test]
653    async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
654        init_test(cx);
655        let http = FakeHttpClient::with_200_response();
656        let installation_id = Some("installation_id".to_string());
657        let session_id = "session_id".to_string();
658
659        cx.update(|cx| {
660            let telemetry = Telemetry::new(http, cx);
661            telemetry.state.lock().max_queue_size = 4;
662            telemetry.start(installation_id, session_id, cx);
663
664            assert!(is_empty_state(&telemetry));
665
666            let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
667            let operation = "test".to_string();
668
669            let event =
670                telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
671            assert_eq!(
672                event,
673                Event::App {
674                    operation: operation.clone(),
675                    milliseconds_since_first_event: 0
676                }
677            );
678            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
679            assert!(telemetry.state.lock().flush_events_task.is_some());
680            assert_eq!(
681                telemetry.state.lock().first_event_date_time,
682                Some(first_date_time)
683            );
684
685            let duration = Duration::from_millis(1);
686
687            // Test 1 millisecond before the flush interval limit is met
688            executor.advance_clock(FLUSH_INTERVAL - duration);
689
690            assert!(!is_empty_state(&telemetry));
691
692            // Test the exact moment the flush interval limit is met
693            executor.advance_clock(duration);
694
695            assert!(is_empty_state(&telemetry));
696        });
697    }
698
699    // TODO:
700    // Test settings
701    // Update FakeHTTPClient to keep track of the number of requests and assert on it
702
703    fn init_test(cx: &mut TestAppContext) {
704        cx.update(|cx| {
705            let settings_store = SettingsStore::test(cx);
706            cx.set_global(settings_store);
707        });
708    }
709
710    fn is_empty_state(telemetry: &Telemetry) -> bool {
711        telemetry.state.lock().events_queue.is_empty()
712            && telemetry.state.lock().flush_events_task.is_none()
713            && telemetry.state.lock().first_event_date_time.is_none()
714    }
715}