telemetry.rs

  1mod event_coalescer;
  2
  3use crate::TelemetrySettings;
  4use chrono::{DateTime, Utc};
  5use futures::Future;
  6use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
  7use parking_lot::Mutex;
  8use serde::Serialize;
  9use settings::{Settings, SettingsStore};
 10use std::{env, io::Write, mem, path::PathBuf, sync::Arc, time::Duration};
 11use sysinfo::{
 12    CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
 13};
 14use tempfile::NamedTempFile;
 15use util::http::{HttpClient, ZedHttpClient};
 16#[cfg(not(debug_assertions))]
 17use util::ResultExt;
 18use util::{channel::ReleaseChannel, TryFutureExt};
 19
 20use self::event_coalescer::EventCoalescer;
 21
 22pub struct Telemetry {
 23    http_client: Arc<ZedHttpClient>,
 24    executor: BackgroundExecutor,
 25    state: Arc<Mutex<TelemetryState>>,
 26}
 27
 28struct TelemetryState {
 29    settings: TelemetrySettings,
 30    metrics_id: Option<Arc<str>>,      // Per logged-in user
 31    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 32    session_id: Option<Arc<str>>,      // Per app launch
 33    release_channel: Option<&'static str>,
 34    app_metadata: AppMetadata,
 35    architecture: &'static str,
 36    events_queue: Vec<EventWrapper>,
 37    flush_events_task: Option<Task<()>>,
 38    log_file: Option<NamedTempFile>,
 39    is_staff: Option<bool>,
 40    first_event_date_time: Option<DateTime<Utc>>,
 41    event_coalescer: EventCoalescer,
 42    max_queue_size: usize,
 43}
 44
 45#[derive(Serialize, Debug)]
 46struct EventRequestBody {
 47    installation_id: Option<Arc<str>>,
 48    session_id: Option<Arc<str>>,
 49    is_staff: Option<bool>,
 50    app_version: Option<String>,
 51    os_name: &'static str,
 52    os_version: Option<String>,
 53    architecture: &'static str,
 54    release_channel: Option<&'static str>,
 55    events: Vec<EventWrapper>,
 56}
 57
 58#[derive(Serialize, Debug)]
 59struct EventWrapper {
 60    signed_in: bool,
 61    #[serde(flatten)]
 62    event: Event,
 63}
 64
 65#[derive(Clone, Debug, PartialEq, Serialize)]
 66#[serde(rename_all = "snake_case")]
 67pub enum AssistantKind {
 68    Panel,
 69    Inline,
 70}
 71
 72#[derive(Clone, Debug, PartialEq, Serialize)]
 73#[serde(tag = "type")]
 74pub enum Event {
 75    Editor {
 76        operation: &'static str,
 77        file_extension: Option<String>,
 78        vim_mode: bool,
 79        copilot_enabled: bool,
 80        copilot_enabled_for_language: bool,
 81        milliseconds_since_first_event: i64,
 82    },
 83    Copilot {
 84        suggestion_id: Option<String>,
 85        suggestion_accepted: bool,
 86        file_extension: Option<String>,
 87        milliseconds_since_first_event: i64,
 88    },
 89    Call {
 90        operation: &'static str,
 91        room_id: Option<u64>,
 92        channel_id: Option<u64>,
 93        milliseconds_since_first_event: i64,
 94    },
 95    Assistant {
 96        conversation_id: Option<String>,
 97        kind: AssistantKind,
 98        model: &'static str,
 99        milliseconds_since_first_event: i64,
100    },
101    Cpu {
102        usage_as_percentage: f32,
103        core_count: u32,
104        milliseconds_since_first_event: i64,
105    },
106    Memory {
107        memory_in_bytes: u64,
108        virtual_memory_in_bytes: u64,
109        milliseconds_since_first_event: i64,
110    },
111    App {
112        operation: String,
113        milliseconds_since_first_event: i64,
114    },
115    Setting {
116        setting: &'static str,
117        value: String,
118        milliseconds_since_first_event: i64,
119    },
120    Edit {
121        duration: i64,
122        environment: &'static str,
123        milliseconds_since_first_event: i64,
124    },
125    Action {
126        source: &'static str,
127        action: String,
128        milliseconds_since_first_event: i64,
129    },
130}
131
132#[cfg(debug_assertions)]
133const MAX_QUEUE_LEN: usize = 5;
134
135#[cfg(not(debug_assertions))]
136const MAX_QUEUE_LEN: usize = 50;
137
138#[cfg(debug_assertions)]
139const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
140
141#[cfg(not(debug_assertions))]
142const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
143
144impl Telemetry {
145    pub fn new(client: Arc<ZedHttpClient>, cx: &mut AppContext) -> Arc<Self> {
146        let release_channel = cx
147            .try_global::<ReleaseChannel>()
148            .map(|release_channel| release_channel.display_name());
149
150        TelemetrySettings::register(cx);
151
152        let state = Arc::new(Mutex::new(TelemetryState {
153            settings: TelemetrySettings::get_global(cx).clone(),
154            app_metadata: cx.app_metadata(),
155            architecture: env::consts::ARCH,
156            release_channel,
157            installation_id: None,
158            metrics_id: None,
159            session_id: None,
160            events_queue: Vec::new(),
161            flush_events_task: None,
162            log_file: None,
163            is_staff: None,
164            first_event_date_time: None,
165            event_coalescer: EventCoalescer::new(),
166            max_queue_size: MAX_QUEUE_LEN,
167        }));
168
169        #[cfg(not(debug_assertions))]
170        cx.background_executor()
171            .spawn({
172                let state = state.clone();
173                async move {
174                    if let Some(tempfile) =
175                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
176                    {
177                        state.lock().log_file = Some(tempfile);
178                    }
179                }
180            })
181            .detach();
182
183        cx.observe_global::<SettingsStore>({
184            let state = state.clone();
185
186            move |cx| {
187                let mut state = state.lock();
188                state.settings = TelemetrySettings::get_global(cx).clone();
189            }
190        })
191        .detach();
192
193        // TODO: Replace all hardware stuff with nested SystemSpecs json
194        let this = Arc::new(Self {
195            http_client: client,
196            executor: cx.background_executor().clone(),
197            state,
198        });
199
200        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
201        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
202        std::mem::forget(cx.on_app_quit({
203            let this = this.clone();
204            move |_| this.shutdown_telemetry()
205        }));
206
207        this
208    }
209
210    #[cfg(any(test, feature = "test-support"))]
211    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
212        Task::ready(())
213    }
214
215    // Skip calling this function in tests.
216    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
217    #[cfg(not(any(test, feature = "test-support")))]
218    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
219        self.report_app_event("close".to_string());
220        // TODO: close final edit period and make sure it's sent
221        Task::ready(())
222    }
223
224    pub fn log_file_path(&self) -> Option<PathBuf> {
225        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
226    }
227
228    pub fn start(
229        self: &Arc<Self>,
230        installation_id: Option<String>,
231        session_id: String,
232        cx: &mut AppContext,
233    ) {
234        let mut state = self.state.lock();
235        state.installation_id = installation_id.map(|id| id.into());
236        state.session_id = Some(session_id.into());
237        drop(state);
238
239        let this = self.clone();
240        cx.spawn(|_| async move {
241            // Avoiding calling `System::new_all()`, as there have been crashes related to it
242            let refresh_kind = RefreshKind::new()
243                .with_memory() // For memory usage
244                .with_processes(ProcessRefreshKind::everything()) // For process usage
245                .with_cpu(CpuRefreshKind::everything()); // For core count
246
247            let mut system = System::new_with_specifics(refresh_kind);
248
249            // Avoiding calling `refresh_all()`, just update what we need
250            system.refresh_specifics(refresh_kind);
251
252            // Waiting some amount of time before the first query is important to get a reasonable value
253            // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
254            const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
255
256            loop {
257                smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
258
259                system.refresh_specifics(refresh_kind);
260
261                let current_process = Pid::from_u32(std::process::id());
262                let Some(process) = system.processes().get(&current_process) else {
263                    let process = current_process;
264                    log::error!("Failed to find own process {process:?} in system process table");
265                    // TODO: Fire an error telemetry event
266                    return;
267                };
268
269                this.report_memory_event(process.memory(), process.virtual_memory());
270                this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
271            }
272        })
273        .detach();
274    }
275
276    pub fn set_authenticated_user_info(
277        self: &Arc<Self>,
278        metrics_id: Option<String>,
279        is_staff: bool,
280    ) {
281        let mut state = self.state.lock();
282
283        if !state.settings.metrics {
284            return;
285        }
286
287        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
288        state.metrics_id = metrics_id.clone();
289        state.is_staff = Some(is_staff);
290        drop(state);
291    }
292
293    pub fn report_editor_event(
294        self: &Arc<Self>,
295        file_extension: Option<String>,
296        vim_mode: bool,
297        operation: &'static str,
298        copilot_enabled: bool,
299        copilot_enabled_for_language: bool,
300    ) {
301        let event = Event::Editor {
302            file_extension,
303            vim_mode,
304            operation,
305            copilot_enabled,
306            copilot_enabled_for_language,
307            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
308        };
309
310        self.report_event(event)
311    }
312
313    pub fn report_copilot_event(
314        self: &Arc<Self>,
315        suggestion_id: Option<String>,
316        suggestion_accepted: bool,
317        file_extension: Option<String>,
318    ) {
319        let event = Event::Copilot {
320            suggestion_id,
321            suggestion_accepted,
322            file_extension,
323            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
324        };
325
326        self.report_event(event)
327    }
328
329    pub fn report_assistant_event(
330        self: &Arc<Self>,
331        conversation_id: Option<String>,
332        kind: AssistantKind,
333        model: &'static str,
334    ) {
335        let event = Event::Assistant {
336            conversation_id,
337            kind,
338            model,
339            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
340        };
341
342        self.report_event(event)
343    }
344
345    pub fn report_call_event(
346        self: &Arc<Self>,
347        operation: &'static str,
348        room_id: Option<u64>,
349        channel_id: Option<u64>,
350    ) {
351        let event = Event::Call {
352            operation,
353            room_id,
354            channel_id,
355            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
356        };
357
358        self.report_event(event)
359    }
360
361    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
362        let event = Event::Cpu {
363            usage_as_percentage,
364            core_count,
365            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
366        };
367
368        self.report_event(event)
369    }
370
371    pub fn report_memory_event(
372        self: &Arc<Self>,
373        memory_in_bytes: u64,
374        virtual_memory_in_bytes: u64,
375    ) {
376        let event = Event::Memory {
377            memory_in_bytes,
378            virtual_memory_in_bytes,
379            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
380        };
381
382        self.report_event(event)
383    }
384
385    pub fn report_app_event(self: &Arc<Self>, operation: String) {
386        self.report_app_event_with_date_time(operation, Utc::now());
387    }
388
389    fn report_app_event_with_date_time(
390        self: &Arc<Self>,
391        operation: String,
392        date_time: DateTime<Utc>,
393    ) -> Event {
394        let event = Event::App {
395            operation,
396            milliseconds_since_first_event: self.milliseconds_since_first_event(date_time),
397        };
398
399        self.report_event(event.clone());
400
401        event
402    }
403
404    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
405        let event = Event::Setting {
406            setting,
407            value,
408            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
409        };
410
411        self.report_event(event)
412    }
413
414    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
415        let mut state = self.state.lock();
416        let period_data = state.event_coalescer.log_event(environment);
417        drop(state);
418
419        if let Some((start, end, environment)) = period_data {
420            let event = Event::Edit {
421                duration: end.timestamp_millis() - start.timestamp_millis(),
422                environment,
423                milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
424            };
425
426            self.report_event(event);
427        }
428    }
429
430    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
431        let event = Event::Action {
432            source,
433            action,
434            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
435        };
436
437        self.report_event(event)
438    }
439
440    fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
441        let mut state = self.state.lock();
442
443        match state.first_event_date_time {
444            Some(first_event_date_time) => {
445                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
446            }
447            None => {
448                state.first_event_date_time = Some(date_time);
449                0
450            }
451        }
452    }
453
454    fn report_event(self: &Arc<Self>, event: Event) {
455        let mut state = self.state.lock();
456
457        if !state.settings.metrics {
458            return;
459        }
460
461        if state.flush_events_task.is_none() {
462            let this = self.clone();
463            let executor = self.executor.clone();
464            state.flush_events_task = Some(self.executor.spawn(async move {
465                executor.timer(FLUSH_INTERVAL).await;
466                this.flush_events();
467            }));
468        }
469
470        let signed_in = state.metrics_id.is_some();
471        state.events_queue.push(EventWrapper { signed_in, event });
472
473        if state.installation_id.is_some() {
474            if state.events_queue.len() >= state.max_queue_size {
475                drop(state);
476                self.flush_events();
477            }
478        }
479    }
480
481    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
482        self.state.lock().metrics_id.clone()
483    }
484
485    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
486        self.state.lock().installation_id.clone()
487    }
488
489    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
490        self.state.lock().is_staff
491    }
492
493    pub fn flush_events(self: &Arc<Self>) {
494        let mut state = self.state.lock();
495        state.first_event_date_time = None;
496        let mut events = mem::take(&mut state.events_queue);
497        state.flush_events_task.take();
498        drop(state);
499        if events.is_empty() {
500            return;
501        }
502
503        let this = self.clone();
504        self.executor
505            .spawn(
506                async move {
507                    let mut json_bytes = Vec::new();
508
509                    if let Some(file) = &mut this.state.lock().log_file {
510                        let file = file.as_file_mut();
511                        for event in &mut events {
512                            json_bytes.clear();
513                            serde_json::to_writer(&mut json_bytes, event)?;
514                            file.write_all(&json_bytes)?;
515                            file.write(b"\n")?;
516                        }
517                    }
518
519                    {
520                        let state = this.state.lock();
521                        let request_body = EventRequestBody {
522                            installation_id: state.installation_id.clone(),
523                            session_id: state.session_id.clone(),
524                            is_staff: state.is_staff.clone(),
525                            app_version: state
526                                .app_metadata
527                                .app_version
528                                .map(|version| version.to_string()),
529                            os_name: state.app_metadata.os_name,
530                            os_version: state
531                                .app_metadata
532                                .os_version
533                                .map(|version| version.to_string()),
534                            architecture: state.architecture,
535
536                            release_channel: state.release_channel,
537                            events,
538                        };
539                        json_bytes.clear();
540                        serde_json::to_writer(&mut json_bytes, &request_body)?;
541                    }
542
543                    this.http_client
544                        .post_json(&this.http_client.zed_url("/api/events"), json_bytes.into())
545                        .await?;
546                    anyhow::Ok(())
547                }
548                .log_err(),
549            )
550            .detach();
551    }
552}
553
554#[cfg(test)]
555mod tests {
556    use super::*;
557    use chrono::TimeZone;
558    use gpui::TestAppContext;
559    use util::http::FakeHttpClient;
560
561    #[gpui::test]
562    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
563        init_test(cx);
564        let http = FakeHttpClient::with_200_response();
565        let installation_id = Some("installation_id".to_string());
566        let session_id = "session_id".to_string();
567
568        cx.update(|cx| {
569            let telemetry = Telemetry::new(http, cx);
570
571            telemetry.state.lock().max_queue_size = 4;
572            telemetry.start(installation_id, session_id, cx);
573
574            assert!(is_empty_state(&telemetry));
575
576            let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
577            let operation = "test".to_string();
578
579            let event =
580                telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
581            assert_eq!(
582                event,
583                Event::App {
584                    operation: operation.clone(),
585                    milliseconds_since_first_event: 0
586                }
587            );
588            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
589            assert!(telemetry.state.lock().flush_events_task.is_some());
590            assert_eq!(
591                telemetry.state.lock().first_event_date_time,
592                Some(first_date_time)
593            );
594
595            let mut date_time = first_date_time + chrono::Duration::milliseconds(100);
596
597            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
598            assert_eq!(
599                event,
600                Event::App {
601                    operation: operation.clone(),
602                    milliseconds_since_first_event: 100
603                }
604            );
605            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
606            assert!(telemetry.state.lock().flush_events_task.is_some());
607            assert_eq!(
608                telemetry.state.lock().first_event_date_time,
609                Some(first_date_time)
610            );
611
612            date_time += chrono::Duration::milliseconds(100);
613
614            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
615            assert_eq!(
616                event,
617                Event::App {
618                    operation: operation.clone(),
619                    milliseconds_since_first_event: 200
620                }
621            );
622            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
623            assert!(telemetry.state.lock().flush_events_task.is_some());
624            assert_eq!(
625                telemetry.state.lock().first_event_date_time,
626                Some(first_date_time)
627            );
628
629            date_time += chrono::Duration::milliseconds(100);
630
631            // Adding a 4th event should cause a flush
632            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
633            assert_eq!(
634                event,
635                Event::App {
636                    operation: operation.clone(),
637                    milliseconds_since_first_event: 300
638                }
639            );
640
641            assert!(is_empty_state(&telemetry));
642        });
643    }
644
645    #[gpui::test]
646    async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
647        init_test(cx);
648        let http = FakeHttpClient::with_200_response();
649        let installation_id = Some("installation_id".to_string());
650        let session_id = "session_id".to_string();
651
652        cx.update(|cx| {
653            let telemetry = Telemetry::new(http, cx);
654            telemetry.state.lock().max_queue_size = 4;
655            telemetry.start(installation_id, session_id, cx);
656
657            assert!(is_empty_state(&telemetry));
658
659            let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
660            let operation = "test".to_string();
661
662            let event =
663                telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
664            assert_eq!(
665                event,
666                Event::App {
667                    operation: operation.clone(),
668                    milliseconds_since_first_event: 0
669                }
670            );
671            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
672            assert!(telemetry.state.lock().flush_events_task.is_some());
673            assert_eq!(
674                telemetry.state.lock().first_event_date_time,
675                Some(first_date_time)
676            );
677
678            let duration = Duration::from_millis(1);
679
680            // Test 1 millisecond before the flush interval limit is met
681            executor.advance_clock(FLUSH_INTERVAL - duration);
682
683            assert!(!is_empty_state(&telemetry));
684
685            // Test the exact moment the flush interval limit is met
686            executor.advance_clock(duration);
687
688            assert!(is_empty_state(&telemetry));
689        });
690    }
691
692    // TODO:
693    // Test settings
694    // Update FakeHTTPClient to keep track of the number of requests and assert on it
695
696    fn init_test(cx: &mut TestAppContext) {
697        cx.update(|cx| {
698            let settings_store = SettingsStore::test(cx);
699            cx.set_global(settings_store);
700        });
701    }
702
703    fn is_empty_state(telemetry: &Telemetry) -> bool {
704        telemetry.state.lock().events_queue.is_empty()
705            && telemetry.state.lock().flush_events_task.is_none()
706            && telemetry.state.lock().first_event_date_time.is_none()
707    }
708}