telemetry.rs

  1mod event_coalescer;
  2
  3use crate::{TelemetrySettings, ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
  4use chrono::{DateTime, Utc};
  5use futures::Future;
  6use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
  7use lazy_static::lazy_static;
  8use parking_lot::Mutex;
  9use serde::Serialize;
 10use settings::{Settings, SettingsStore};
 11use std::{env, io::Write, mem, path::PathBuf, sync::Arc, time::Duration};
 12use sysinfo::{
 13    CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
 14};
 15use tempfile::NamedTempFile;
 16use util::http::HttpClient;
 17#[cfg(not(debug_assertions))]
 18use util::ResultExt;
 19use util::{channel::ReleaseChannel, TryFutureExt};
 20
 21use self::event_coalescer::EventCoalescer;
 22
 23pub struct Telemetry {
 24    http_client: Arc<dyn HttpClient>,
 25    executor: BackgroundExecutor,
 26    state: Arc<Mutex<TelemetryState>>,
 27}
 28
 29struct TelemetryState {
 30    settings: TelemetrySettings,
 31    metrics_id: Option<Arc<str>>,      // Per logged-in user
 32    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 33    session_id: Option<Arc<str>>,      // Per app launch
 34    release_channel: Option<&'static str>,
 35    app_metadata: AppMetadata,
 36    architecture: &'static str,
 37    events_queue: Vec<EventWrapper>,
 38    flush_events_task: Option<Task<()>>,
 39    log_file: Option<NamedTempFile>,
 40    is_staff: Option<bool>,
 41    first_event_date_time: Option<DateTime<Utc>>,
 42    event_coalescer: EventCoalescer,
 43    max_queue_size: usize,
 44}
 45
 46const EVENTS_URL_PATH: &'static str = "/api/events";
 47
 48lazy_static! {
 49    static ref EVENTS_URL: String = format!("{}{}", *ZED_SERVER_URL, EVENTS_URL_PATH);
 50}
 51
 52#[derive(Serialize, Debug)]
 53struct EventRequestBody {
 54    token: &'static str,
 55    installation_id: Option<Arc<str>>,
 56    session_id: Option<Arc<str>>,
 57    is_staff: Option<bool>,
 58    app_version: Option<String>,
 59    os_name: &'static str,
 60    os_version: Option<String>,
 61    architecture: &'static str,
 62    release_channel: Option<&'static str>,
 63    events: Vec<EventWrapper>,
 64}
 65
 66#[derive(Serialize, Debug)]
 67struct EventWrapper {
 68    signed_in: bool,
 69    #[serde(flatten)]
 70    event: Event,
 71}
 72
 73#[derive(Clone, Debug, PartialEq, Serialize)]
 74#[serde(rename_all = "snake_case")]
 75pub enum AssistantKind {
 76    Panel,
 77    Inline,
 78}
 79
 80#[derive(Clone, Debug, PartialEq, Serialize)]
 81#[serde(tag = "type")]
 82pub enum Event {
 83    Editor {
 84        operation: &'static str,
 85        file_extension: Option<String>,
 86        vim_mode: bool,
 87        copilot_enabled: bool,
 88        copilot_enabled_for_language: bool,
 89        milliseconds_since_first_event: i64,
 90    },
 91    Copilot {
 92        suggestion_id: Option<String>,
 93        suggestion_accepted: bool,
 94        file_extension: Option<String>,
 95        milliseconds_since_first_event: i64,
 96    },
 97    Call {
 98        operation: &'static str,
 99        room_id: Option<u64>,
100        channel_id: Option<u64>,
101        milliseconds_since_first_event: i64,
102    },
103    Assistant {
104        conversation_id: Option<String>,
105        kind: AssistantKind,
106        model: &'static str,
107        milliseconds_since_first_event: i64,
108    },
109    Cpu {
110        usage_as_percentage: f32,
111        core_count: u32,
112        milliseconds_since_first_event: i64,
113    },
114    Memory {
115        memory_in_bytes: u64,
116        virtual_memory_in_bytes: u64,
117        milliseconds_since_first_event: i64,
118    },
119    App {
120        operation: String,
121        milliseconds_since_first_event: i64,
122    },
123    Setting {
124        setting: &'static str,
125        value: String,
126        milliseconds_since_first_event: i64,
127    },
128    Edit {
129        duration: i64,
130        environment: &'static str,
131        milliseconds_since_first_event: i64,
132    },
133    Action {
134        source: &'static str,
135        action: String,
136        milliseconds_since_first_event: i64,
137    },
138}
139
140#[cfg(debug_assertions)]
141const MAX_QUEUE_LEN: usize = 5;
142
143#[cfg(not(debug_assertions))]
144const MAX_QUEUE_LEN: usize = 50;
145
146#[cfg(debug_assertions)]
147const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
148
149#[cfg(not(debug_assertions))]
150const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
151
152impl Telemetry {
153    pub fn new(client: Arc<dyn HttpClient>, cx: &mut AppContext) -> Arc<Self> {
154        let release_channel = cx
155            .try_global::<ReleaseChannel>()
156            .map(|release_channel| release_channel.display_name());
157
158        TelemetrySettings::register(cx);
159
160        let state = Arc::new(Mutex::new(TelemetryState {
161            settings: TelemetrySettings::get_global(cx).clone(),
162            app_metadata: cx.app_metadata(),
163            architecture: env::consts::ARCH,
164            release_channel,
165            installation_id: None,
166            metrics_id: None,
167            session_id: None,
168            events_queue: Vec::new(),
169            flush_events_task: None,
170            log_file: None,
171            is_staff: None,
172            first_event_date_time: None,
173            event_coalescer: EventCoalescer::new(),
174            max_queue_size: MAX_QUEUE_LEN,
175        }));
176
177        #[cfg(not(debug_assertions))]
178        cx.background_executor()
179            .spawn({
180                let state = state.clone();
181                async move {
182                    if let Some(tempfile) =
183                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
184                    {
185                        state.lock().log_file = Some(tempfile);
186                    }
187                }
188            })
189            .detach();
190
191        cx.observe_global::<SettingsStore>({
192            let state = state.clone();
193
194            move |cx| {
195                let mut state = state.lock();
196                state.settings = TelemetrySettings::get_global(cx).clone();
197            }
198        })
199        .detach();
200
201        // TODO: Replace all hardware stuff with nested SystemSpecs json
202        let this = Arc::new(Self {
203            http_client: client,
204            executor: cx.background_executor().clone(),
205            state,
206        });
207
208        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
209        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
210        std::mem::forget(cx.on_app_quit({
211            let this = this.clone();
212            move |_| this.shutdown_telemetry()
213        }));
214
215        this
216    }
217
218    #[cfg(any(test, feature = "test-support"))]
219    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
220        Task::ready(())
221    }
222
223    // Skip calling this function in tests.
224    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
225    #[cfg(not(any(test, feature = "test-support")))]
226    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
227        self.report_app_event("close".to_string());
228        // TODO: close final edit period and make sure it's sent
229        Task::ready(())
230    }
231
232    pub fn log_file_path(&self) -> Option<PathBuf> {
233        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
234    }
235
236    pub fn start(
237        self: &Arc<Self>,
238        installation_id: Option<String>,
239        session_id: String,
240        cx: &mut AppContext,
241    ) {
242        let mut state = self.state.lock();
243        state.installation_id = installation_id.map(|id| id.into());
244        state.session_id = Some(session_id.into());
245        drop(state);
246
247        let this = self.clone();
248        cx.spawn(|_| async move {
249            // Avoiding calling `System::new_all()`, as there have been crashes related to it
250            let refresh_kind = RefreshKind::new()
251                .with_memory() // For memory usage
252                .with_processes(ProcessRefreshKind::everything()) // For process usage
253                .with_cpu(CpuRefreshKind::everything()); // For core count
254
255            let mut system = System::new_with_specifics(refresh_kind);
256
257            // Avoiding calling `refresh_all()`, just update what we need
258            system.refresh_specifics(refresh_kind);
259
260            // Waiting some amount of time before the first query is important to get a reasonable value
261            // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
262            const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
263
264            loop {
265                smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
266
267                system.refresh_specifics(refresh_kind);
268
269                let current_process = Pid::from_u32(std::process::id());
270                let Some(process) = system.processes().get(&current_process) else {
271                    let process = current_process;
272                    log::error!("Failed to find own process {process:?} in system process table");
273                    // TODO: Fire an error telemetry event
274                    return;
275                };
276
277                this.report_memory_event(process.memory(), process.virtual_memory());
278                this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
279            }
280        })
281        .detach();
282    }
283
284    pub fn set_authenticated_user_info(
285        self: &Arc<Self>,
286        metrics_id: Option<String>,
287        is_staff: bool,
288    ) {
289        let mut state = self.state.lock();
290
291        if !state.settings.metrics {
292            return;
293        }
294
295        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
296        state.metrics_id = metrics_id.clone();
297        state.is_staff = Some(is_staff);
298        drop(state);
299    }
300
301    pub fn report_editor_event(
302        self: &Arc<Self>,
303        file_extension: Option<String>,
304        vim_mode: bool,
305        operation: &'static str,
306        copilot_enabled: bool,
307        copilot_enabled_for_language: bool,
308    ) {
309        let event = Event::Editor {
310            file_extension,
311            vim_mode,
312            operation,
313            copilot_enabled,
314            copilot_enabled_for_language,
315            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
316        };
317
318        self.report_event(event)
319    }
320
321    pub fn report_copilot_event(
322        self: &Arc<Self>,
323        suggestion_id: Option<String>,
324        suggestion_accepted: bool,
325        file_extension: Option<String>,
326    ) {
327        let event = Event::Copilot {
328            suggestion_id,
329            suggestion_accepted,
330            file_extension,
331            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
332        };
333
334        self.report_event(event)
335    }
336
337    pub fn report_assistant_event(
338        self: &Arc<Self>,
339        conversation_id: Option<String>,
340        kind: AssistantKind,
341        model: &'static str,
342    ) {
343        let event = Event::Assistant {
344            conversation_id,
345            kind,
346            model,
347            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
348        };
349
350        self.report_event(event)
351    }
352
353    pub fn report_call_event(
354        self: &Arc<Self>,
355        operation: &'static str,
356        room_id: Option<u64>,
357        channel_id: Option<u64>,
358    ) {
359        let event = Event::Call {
360            operation,
361            room_id,
362            channel_id,
363            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
364        };
365
366        self.report_event(event)
367    }
368
369    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
370        let event = Event::Cpu {
371            usage_as_percentage,
372            core_count,
373            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
374        };
375
376        self.report_event(event)
377    }
378
379    pub fn report_memory_event(
380        self: &Arc<Self>,
381        memory_in_bytes: u64,
382        virtual_memory_in_bytes: u64,
383    ) {
384        let event = Event::Memory {
385            memory_in_bytes,
386            virtual_memory_in_bytes,
387            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
388        };
389
390        self.report_event(event)
391    }
392
393    pub fn report_app_event(self: &Arc<Self>, operation: String) {
394        self.report_app_event_with_date_time(operation, Utc::now());
395    }
396
397    fn report_app_event_with_date_time(
398        self: &Arc<Self>,
399        operation: String,
400        date_time: DateTime<Utc>,
401    ) -> Event {
402        let event = Event::App {
403            operation,
404            milliseconds_since_first_event: self.milliseconds_since_first_event(date_time),
405        };
406
407        self.report_event(event.clone());
408
409        event
410    }
411
412    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
413        let event = Event::Setting {
414            setting,
415            value,
416            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
417        };
418
419        self.report_event(event)
420    }
421
422    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
423        let mut state = self.state.lock();
424        let period_data = state.event_coalescer.log_event(environment);
425        drop(state);
426
427        if let Some((start, end, environment)) = period_data {
428            let event = Event::Edit {
429                duration: end.timestamp_millis() - start.timestamp_millis(),
430                environment,
431                milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
432            };
433
434            self.report_event(event);
435        }
436    }
437
438    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
439        let event = Event::Action {
440            source,
441            action,
442            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
443        };
444
445        self.report_event(event)
446    }
447
448    fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
449        let mut state = self.state.lock();
450
451        match state.first_event_date_time {
452            Some(first_event_date_time) => {
453                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
454            }
455            None => {
456                state.first_event_date_time = Some(date_time);
457                0
458            }
459        }
460    }
461
462    fn report_event(self: &Arc<Self>, event: Event) {
463        let mut state = self.state.lock();
464
465        if !state.settings.metrics {
466            return;
467        }
468
469        if state.flush_events_task.is_none() {
470            let this = self.clone();
471            let executor = self.executor.clone();
472            state.flush_events_task = Some(self.executor.spawn(async move {
473                executor.timer(FLUSH_INTERVAL).await;
474                this.flush_events();
475            }));
476        }
477
478        let signed_in = state.metrics_id.is_some();
479        state.events_queue.push(EventWrapper { signed_in, event });
480
481        if state.installation_id.is_some() {
482            if state.events_queue.len() >= state.max_queue_size {
483                drop(state);
484                self.flush_events();
485            }
486        }
487    }
488
489    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
490        self.state.lock().metrics_id.clone()
491    }
492
493    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
494        self.state.lock().installation_id.clone()
495    }
496
497    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
498        self.state.lock().is_staff
499    }
500
501    pub fn flush_events(self: &Arc<Self>) {
502        let mut state = self.state.lock();
503        state.first_event_date_time = None;
504        let mut events = mem::take(&mut state.events_queue);
505        state.flush_events_task.take();
506        drop(state);
507        if events.is_empty() {
508            return;
509        }
510
511        let this = self.clone();
512        self.executor
513            .spawn(
514                async move {
515                    let mut json_bytes = Vec::new();
516
517                    if let Some(file) = &mut this.state.lock().log_file {
518                        let file = file.as_file_mut();
519                        for event in &mut events {
520                            json_bytes.clear();
521                            serde_json::to_writer(&mut json_bytes, event)?;
522                            file.write_all(&json_bytes)?;
523                            file.write(b"\n")?;
524                        }
525                    }
526
527                    {
528                        let state = this.state.lock();
529                        let request_body = EventRequestBody {
530                            token: ZED_SECRET_CLIENT_TOKEN,
531                            installation_id: state.installation_id.clone(),
532                            session_id: state.session_id.clone(),
533                            is_staff: state.is_staff.clone(),
534                            app_version: state
535                                .app_metadata
536                                .app_version
537                                .map(|version| version.to_string()),
538                            os_name: state.app_metadata.os_name,
539                            os_version: state
540                                .app_metadata
541                                .os_version
542                                .map(|version| version.to_string()),
543                            architecture: state.architecture,
544
545                            release_channel: state.release_channel,
546                            events,
547                        };
548                        json_bytes.clear();
549                        serde_json::to_writer(&mut json_bytes, &request_body)?;
550                    }
551
552                    this.http_client
553                        .post_json(EVENTS_URL.as_str(), json_bytes.into())
554                        .await?;
555                    anyhow::Ok(())
556                }
557                .log_err(),
558            )
559            .detach();
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566    use chrono::TimeZone;
567    use gpui::TestAppContext;
568    use util::http::FakeHttpClient;
569
570    #[gpui::test]
571    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
572        init_test(cx);
573        let http = FakeHttpClient::with_200_response();
574        let installation_id = Some("installation_id".to_string());
575        let session_id = "session_id".to_string();
576
577        cx.update(|cx| {
578            let telemetry = Telemetry::new(http, cx);
579
580            telemetry.state.lock().max_queue_size = 4;
581            telemetry.start(installation_id, session_id, cx);
582
583            assert!(is_empty_state(&telemetry));
584
585            let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
586            let operation = "test".to_string();
587
588            let event =
589                telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
590            assert_eq!(
591                event,
592                Event::App {
593                    operation: operation.clone(),
594                    milliseconds_since_first_event: 0
595                }
596            );
597            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
598            assert!(telemetry.state.lock().flush_events_task.is_some());
599            assert_eq!(
600                telemetry.state.lock().first_event_date_time,
601                Some(first_date_time)
602            );
603
604            let mut date_time = first_date_time + chrono::Duration::milliseconds(100);
605
606            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
607            assert_eq!(
608                event,
609                Event::App {
610                    operation: operation.clone(),
611                    milliseconds_since_first_event: 100
612                }
613            );
614            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
615            assert!(telemetry.state.lock().flush_events_task.is_some());
616            assert_eq!(
617                telemetry.state.lock().first_event_date_time,
618                Some(first_date_time)
619            );
620
621            date_time += chrono::Duration::milliseconds(100);
622
623            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
624            assert_eq!(
625                event,
626                Event::App {
627                    operation: operation.clone(),
628                    milliseconds_since_first_event: 200
629                }
630            );
631            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
632            assert!(telemetry.state.lock().flush_events_task.is_some());
633            assert_eq!(
634                telemetry.state.lock().first_event_date_time,
635                Some(first_date_time)
636            );
637
638            date_time += chrono::Duration::milliseconds(100);
639
640            // Adding a 4th event should cause a flush
641            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
642            assert_eq!(
643                event,
644                Event::App {
645                    operation: operation.clone(),
646                    milliseconds_since_first_event: 300
647                }
648            );
649
650            assert!(is_empty_state(&telemetry));
651        });
652    }
653
654    #[gpui::test]
655    async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
656        init_test(cx);
657        let http = FakeHttpClient::with_200_response();
658        let installation_id = Some("installation_id".to_string());
659        let session_id = "session_id".to_string();
660
661        cx.update(|cx| {
662            let telemetry = Telemetry::new(http, cx);
663            telemetry.state.lock().max_queue_size = 4;
664            telemetry.start(installation_id, session_id, cx);
665
666            assert!(is_empty_state(&telemetry));
667
668            let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
669            let operation = "test".to_string();
670
671            let event =
672                telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
673            assert_eq!(
674                event,
675                Event::App {
676                    operation: operation.clone(),
677                    milliseconds_since_first_event: 0
678                }
679            );
680            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
681            assert!(telemetry.state.lock().flush_events_task.is_some());
682            assert_eq!(
683                telemetry.state.lock().first_event_date_time,
684                Some(first_date_time)
685            );
686
687            let duration = Duration::from_millis(1);
688
689            // Test 1 millisecond before the flush interval limit is met
690            executor.advance_clock(FLUSH_INTERVAL - duration);
691
692            assert!(!is_empty_state(&telemetry));
693
694            // Test the exact moment the flush interval limit is met
695            executor.advance_clock(duration);
696
697            assert!(is_empty_state(&telemetry));
698        });
699    }
700
701    // TODO:
702    // Test settings
703    // Update FakeHTTPClient to keep track of the number of requests and assert on it
704
705    fn init_test(cx: &mut TestAppContext) {
706        cx.update(|cx| {
707            let settings_store = SettingsStore::test(cx);
708            cx.set_global(settings_store);
709        });
710    }
711
712    fn is_empty_state(telemetry: &Telemetry) -> bool {
713        telemetry.state.lock().events_queue.is_empty()
714            && telemetry.state.lock().flush_events_task.is_none()
715            && telemetry.state.lock().first_event_date_time.is_none()
716    }
717}