telemetry.rs

  1mod event_coalescer;
  2
  3use crate::TelemetrySettings;
  4use chrono::{DateTime, Utc};
  5use futures::Future;
  6use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
  7use once_cell::sync::Lazy;
  8use parking_lot::Mutex;
  9use release_channel::ReleaseChannel;
 10use serde::Serialize;
 11use settings::{Settings, SettingsStore};
 12use sha2::{Digest, Sha256};
 13use std::io::Write;
 14use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 15use sysinfo::{
 16    CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
 17};
 18use tempfile::NamedTempFile;
 19use util::http::{self, HttpClient, Method, ZedHttpClient};
 20#[cfg(not(debug_assertions))]
 21use util::ResultExt;
 22use util::TryFutureExt;
 23
 24use self::event_coalescer::EventCoalescer;
 25
 26pub struct Telemetry {
 27    http_client: Arc<ZedHttpClient>,
 28    executor: BackgroundExecutor,
 29    state: Arc<Mutex<TelemetryState>>,
 30}
 31
 32struct TelemetryState {
 33    settings: TelemetrySettings,
 34    metrics_id: Option<Arc<str>>,      // Per logged-in user
 35    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 36    session_id: Option<Arc<str>>,      // Per app launch
 37    release_channel: Option<&'static str>,
 38    app_metadata: AppMetadata,
 39    architecture: &'static str,
 40    events_queue: Vec<EventWrapper>,
 41    flush_events_task: Option<Task<()>>,
 42    log_file: Option<NamedTempFile>,
 43    is_staff: Option<bool>,
 44    first_event_date_time: Option<DateTime<Utc>>,
 45    event_coalescer: EventCoalescer,
 46    max_queue_size: usize,
 47}
 48
 49#[derive(Serialize, Debug)]
 50struct EventRequestBody {
 51    installation_id: Option<Arc<str>>,
 52    session_id: Option<Arc<str>>,
 53    is_staff: Option<bool>,
 54    app_version: Option<String>,
 55    os_name: &'static str,
 56    os_version: Option<String>,
 57    architecture: &'static str,
 58    release_channel: Option<&'static str>,
 59    events: Vec<EventWrapper>,
 60}
 61
 62#[derive(Serialize, Debug)]
 63struct EventWrapper {
 64    signed_in: bool,
 65    #[serde(flatten)]
 66    event: Event,
 67}
 68
 69#[derive(Clone, Debug, PartialEq, Serialize)]
 70#[serde(rename_all = "snake_case")]
 71pub enum AssistantKind {
 72    Panel,
 73    Inline,
 74}
 75
 76#[derive(Clone, Debug, PartialEq, Serialize)]
 77#[serde(tag = "type")]
 78pub enum Event {
 79    Editor {
 80        operation: &'static str,
 81        file_extension: Option<String>,
 82        vim_mode: bool,
 83        copilot_enabled: bool,
 84        copilot_enabled_for_language: bool,
 85        milliseconds_since_first_event: i64,
 86    },
 87    Copilot {
 88        suggestion_id: Option<String>,
 89        suggestion_accepted: bool,
 90        file_extension: Option<String>,
 91        milliseconds_since_first_event: i64,
 92    },
 93    Call {
 94        operation: &'static str,
 95        room_id: Option<u64>,
 96        channel_id: Option<u64>,
 97        milliseconds_since_first_event: i64,
 98    },
 99    Assistant {
100        conversation_id: Option<String>,
101        kind: AssistantKind,
102        model: &'static str,
103        milliseconds_since_first_event: i64,
104    },
105    Cpu {
106        usage_as_percentage: f32,
107        core_count: u32,
108        milliseconds_since_first_event: i64,
109    },
110    Memory {
111        memory_in_bytes: u64,
112        virtual_memory_in_bytes: u64,
113        milliseconds_since_first_event: i64,
114    },
115    App {
116        operation: String,
117        milliseconds_since_first_event: i64,
118    },
119    Setting {
120        setting: &'static str,
121        value: String,
122        milliseconds_since_first_event: i64,
123    },
124    Edit {
125        duration: i64,
126        environment: &'static str,
127        milliseconds_since_first_event: i64,
128    },
129    Action {
130        source: &'static str,
131        action: String,
132        milliseconds_since_first_event: i64,
133    },
134}
135
136#[cfg(debug_assertions)]
137const MAX_QUEUE_LEN: usize = 5;
138
139#[cfg(not(debug_assertions))]
140const MAX_QUEUE_LEN: usize = 50;
141
142#[cfg(debug_assertions)]
143const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
144
145#[cfg(not(debug_assertions))]
146const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
147
148static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
149    option_env!("ZED_CLIENT_CHECKSUM_SEED")
150        .map(|s| s.as_bytes().into())
151        .or_else(|| {
152            env::var("ZED_CLIENT_CHECKSUM_SEED")
153                .ok()
154                .map(|s| s.as_bytes().into())
155        })
156});
157
158impl Telemetry {
159    pub fn new(client: Arc<ZedHttpClient>, cx: &mut AppContext) -> Arc<Self> {
160        let release_channel =
161            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
162
163        TelemetrySettings::register(cx);
164
165        let state = Arc::new(Mutex::new(TelemetryState {
166            settings: TelemetrySettings::get_global(cx).clone(),
167            app_metadata: cx.app_metadata(),
168            architecture: env::consts::ARCH,
169            release_channel,
170            installation_id: None,
171            metrics_id: None,
172            session_id: None,
173            events_queue: Vec::new(),
174            flush_events_task: None,
175            log_file: None,
176            is_staff: None,
177            first_event_date_time: None,
178            event_coalescer: EventCoalescer::new(),
179            max_queue_size: MAX_QUEUE_LEN,
180        }));
181
182        #[cfg(not(debug_assertions))]
183        cx.background_executor()
184            .spawn({
185                let state = state.clone();
186                async move {
187                    if let Some(tempfile) =
188                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
189                    {
190                        state.lock().log_file = Some(tempfile);
191                    }
192                }
193            })
194            .detach();
195
196        cx.observe_global::<SettingsStore>({
197            let state = state.clone();
198
199            move |cx| {
200                let mut state = state.lock();
201                state.settings = TelemetrySettings::get_global(cx).clone();
202            }
203        })
204        .detach();
205
206        // TODO: Replace all hardware stuff with nested SystemSpecs json
207        let this = Arc::new(Self {
208            http_client: client,
209            executor: cx.background_executor().clone(),
210            state,
211        });
212
213        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
214        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
215        std::mem::forget(cx.on_app_quit({
216            let this = this.clone();
217            move |_| this.shutdown_telemetry()
218        }));
219
220        this
221    }
222
223    #[cfg(any(test, feature = "test-support"))]
224    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
225        Task::ready(())
226    }
227
228    // Skip calling this function in tests.
229    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
230    #[cfg(not(any(test, feature = "test-support")))]
231    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
232        self.report_app_event("close".to_string());
233        // TODO: close final edit period and make sure it's sent
234        Task::ready(())
235    }
236
237    pub fn log_file_path(&self) -> Option<PathBuf> {
238        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
239    }
240
241    pub fn start(
242        self: &Arc<Self>,
243        installation_id: Option<String>,
244        session_id: String,
245        cx: &mut AppContext,
246    ) {
247        let mut state = self.state.lock();
248        state.installation_id = installation_id.map(|id| id.into());
249        state.session_id = Some(session_id.into());
250        drop(state);
251
252        let this = self.clone();
253        cx.spawn(|_| async move {
254            // Avoiding calling `System::new_all()`, as there have been crashes related to it
255            let refresh_kind = RefreshKind::new()
256                .with_memory() // For memory usage
257                .with_processes(ProcessRefreshKind::everything()) // For process usage
258                .with_cpu(CpuRefreshKind::everything()); // For core count
259
260            let mut system = System::new_with_specifics(refresh_kind);
261
262            // Avoiding calling `refresh_all()`, just update what we need
263            system.refresh_specifics(refresh_kind);
264
265            // Waiting some amount of time before the first query is important to get a reasonable value
266            // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
267            const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
268
269            loop {
270                smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
271
272                system.refresh_specifics(refresh_kind);
273
274                let current_process = Pid::from_u32(std::process::id());
275                let Some(process) = system.processes().get(&current_process) else {
276                    let process = current_process;
277                    log::error!("Failed to find own process {process:?} in system process table");
278                    // TODO: Fire an error telemetry event
279                    return;
280                };
281
282                this.report_memory_event(process.memory(), process.virtual_memory());
283                this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
284            }
285        })
286        .detach();
287    }
288
289    pub fn set_authenticated_user_info(
290        self: &Arc<Self>,
291        metrics_id: Option<String>,
292        is_staff: bool,
293    ) {
294        let mut state = self.state.lock();
295
296        if !state.settings.metrics {
297            return;
298        }
299
300        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
301        state.metrics_id = metrics_id.clone();
302        state.is_staff = Some(is_staff);
303        drop(state);
304    }
305
306    pub fn report_editor_event(
307        self: &Arc<Self>,
308        file_extension: Option<String>,
309        vim_mode: bool,
310        operation: &'static str,
311        copilot_enabled: bool,
312        copilot_enabled_for_language: bool,
313    ) {
314        let event = Event::Editor {
315            file_extension,
316            vim_mode,
317            operation,
318            copilot_enabled,
319            copilot_enabled_for_language,
320            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
321        };
322
323        self.report_event(event)
324    }
325
326    pub fn report_copilot_event(
327        self: &Arc<Self>,
328        suggestion_id: Option<String>,
329        suggestion_accepted: bool,
330        file_extension: Option<String>,
331    ) {
332        let event = Event::Copilot {
333            suggestion_id,
334            suggestion_accepted,
335            file_extension,
336            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
337        };
338
339        self.report_event(event)
340    }
341
342    pub fn report_assistant_event(
343        self: &Arc<Self>,
344        conversation_id: Option<String>,
345        kind: AssistantKind,
346        model: &'static str,
347    ) {
348        let event = Event::Assistant {
349            conversation_id,
350            kind,
351            model,
352            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
353        };
354
355        self.report_event(event)
356    }
357
358    pub fn report_call_event(
359        self: &Arc<Self>,
360        operation: &'static str,
361        room_id: Option<u64>,
362        channel_id: Option<u64>,
363    ) {
364        let event = Event::Call {
365            operation,
366            room_id,
367            channel_id,
368            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
369        };
370
371        self.report_event(event)
372    }
373
374    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
375        let event = Event::Cpu {
376            usage_as_percentage,
377            core_count,
378            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
379        };
380
381        self.report_event(event)
382    }
383
384    pub fn report_memory_event(
385        self: &Arc<Self>,
386        memory_in_bytes: u64,
387        virtual_memory_in_bytes: u64,
388    ) {
389        let event = Event::Memory {
390            memory_in_bytes,
391            virtual_memory_in_bytes,
392            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
393        };
394
395        self.report_event(event)
396    }
397
398    pub fn report_app_event(self: &Arc<Self>, operation: String) {
399        self.report_app_event_with_date_time(operation, Utc::now());
400    }
401
402    fn report_app_event_with_date_time(
403        self: &Arc<Self>,
404        operation: String,
405        date_time: DateTime<Utc>,
406    ) -> Event {
407        let event = Event::App {
408            operation,
409            milliseconds_since_first_event: self.milliseconds_since_first_event(date_time),
410        };
411
412        self.report_event(event.clone());
413
414        event
415    }
416
417    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
418        let event = Event::Setting {
419            setting,
420            value,
421            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
422        };
423
424        self.report_event(event)
425    }
426
427    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
428        let mut state = self.state.lock();
429        let period_data = state.event_coalescer.log_event(environment);
430        drop(state);
431
432        if let Some((start, end, environment)) = period_data {
433            let event = Event::Edit {
434                duration: end.timestamp_millis() - start.timestamp_millis(),
435                environment,
436                milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
437            };
438
439            self.report_event(event);
440        }
441    }
442
443    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
444        let event = Event::Action {
445            source,
446            action,
447            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
448        };
449
450        self.report_event(event)
451    }
452
453    fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
454        let mut state = self.state.lock();
455
456        match state.first_event_date_time {
457            Some(first_event_date_time) => {
458                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
459            }
460            None => {
461                state.first_event_date_time = Some(date_time);
462                0
463            }
464        }
465    }
466
467    fn report_event(self: &Arc<Self>, event: Event) {
468        let mut state = self.state.lock();
469
470        if !state.settings.metrics {
471            return;
472        }
473
474        if state.flush_events_task.is_none() {
475            let this = self.clone();
476            let executor = self.executor.clone();
477            state.flush_events_task = Some(self.executor.spawn(async move {
478                executor.timer(FLUSH_INTERVAL).await;
479                this.flush_events();
480            }));
481        }
482
483        let signed_in = state.metrics_id.is_some();
484        state.events_queue.push(EventWrapper { signed_in, event });
485
486        if state.installation_id.is_some() {
487            if state.events_queue.len() >= state.max_queue_size {
488                drop(state);
489                self.flush_events();
490            }
491        }
492    }
493
494    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
495        self.state.lock().metrics_id.clone()
496    }
497
498    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
499        self.state.lock().installation_id.clone()
500    }
501
502    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
503        self.state.lock().is_staff
504    }
505
506    pub fn flush_events(self: &Arc<Self>) {
507        let mut state = self.state.lock();
508        state.first_event_date_time = None;
509        let mut events = mem::take(&mut state.events_queue);
510        state.flush_events_task.take();
511        drop(state);
512        if events.is_empty() {
513            return;
514        }
515
516        let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
517            return;
518        };
519
520        let this = self.clone();
521        self.executor
522            .spawn(
523                async move {
524                    let mut json_bytes = Vec::new();
525
526                    if let Some(file) = &mut this.state.lock().log_file {
527                        let file = file.as_file_mut();
528                        for event in &mut events {
529                            json_bytes.clear();
530                            serde_json::to_writer(&mut json_bytes, event)?;
531                            file.write_all(&json_bytes)?;
532                            file.write(b"\n")?;
533                        }
534                    }
535
536                    {
537                        let state = this.state.lock();
538                        let request_body = EventRequestBody {
539                            installation_id: state.installation_id.clone(),
540                            session_id: state.session_id.clone(),
541                            is_staff: state.is_staff.clone(),
542                            app_version: state
543                                .app_metadata
544                                .app_version
545                                .map(|version| version.to_string()),
546                            os_name: state.app_metadata.os_name,
547                            os_version: state
548                                .app_metadata
549                                .os_version
550                                .map(|version| version.to_string()),
551                            architecture: state.architecture,
552
553                            release_channel: state.release_channel,
554                            events,
555                        };
556                        json_bytes.clear();
557                        serde_json::to_writer(&mut json_bytes, &request_body)?;
558                    }
559
560                    let mut summer = Sha256::new();
561                    summer.update(checksum_seed);
562                    summer.update(&json_bytes);
563                    summer.update(checksum_seed);
564                    let mut checksum = String::new();
565                    for byte in summer.finalize().as_slice() {
566                        use std::fmt::Write;
567                        write!(&mut checksum, "{:02x}", byte).unwrap();
568                    }
569
570                    let request = http::Request::builder()
571                        .method(Method::POST)
572                        .uri(&this.http_client.zed_url("/api/events"))
573                        .header("Content-Type", "text/plain")
574                        .header("x-zed-checksum", checksum)
575                        .body(json_bytes.into());
576
577                    let response = this.http_client.send(request?).await?;
578                    if response.status() != 200 {
579                        log::error!("Failed to send events: HTTP {:?}", response.status());
580                    }
581                    anyhow::Ok(())
582                }
583                .log_err(),
584            )
585            .detach();
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592    use chrono::TimeZone;
593    use gpui::TestAppContext;
594    use util::http::FakeHttpClient;
595
596    #[gpui::test]
597    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
598        init_test(cx);
599        let http = FakeHttpClient::with_200_response();
600        let installation_id = Some("installation_id".to_string());
601        let session_id = "session_id".to_string();
602
603        cx.update(|cx| {
604            let telemetry = Telemetry::new(http, cx);
605
606            telemetry.state.lock().max_queue_size = 4;
607            telemetry.start(installation_id, session_id, cx);
608
609            assert!(is_empty_state(&telemetry));
610
611            let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
612            let operation = "test".to_string();
613
614            let event =
615                telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
616            assert_eq!(
617                event,
618                Event::App {
619                    operation: operation.clone(),
620                    milliseconds_since_first_event: 0
621                }
622            );
623            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
624            assert!(telemetry.state.lock().flush_events_task.is_some());
625            assert_eq!(
626                telemetry.state.lock().first_event_date_time,
627                Some(first_date_time)
628            );
629
630            let mut date_time = first_date_time + chrono::Duration::milliseconds(100);
631
632            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
633            assert_eq!(
634                event,
635                Event::App {
636                    operation: operation.clone(),
637                    milliseconds_since_first_event: 100
638                }
639            );
640            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
641            assert!(telemetry.state.lock().flush_events_task.is_some());
642            assert_eq!(
643                telemetry.state.lock().first_event_date_time,
644                Some(first_date_time)
645            );
646
647            date_time += chrono::Duration::milliseconds(100);
648
649            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
650            assert_eq!(
651                event,
652                Event::App {
653                    operation: operation.clone(),
654                    milliseconds_since_first_event: 200
655                }
656            );
657            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
658            assert!(telemetry.state.lock().flush_events_task.is_some());
659            assert_eq!(
660                telemetry.state.lock().first_event_date_time,
661                Some(first_date_time)
662            );
663
664            date_time += chrono::Duration::milliseconds(100);
665
666            // Adding a 4th event should cause a flush
667            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
668            assert_eq!(
669                event,
670                Event::App {
671                    operation: operation.clone(),
672                    milliseconds_since_first_event: 300
673                }
674            );
675
676            assert!(is_empty_state(&telemetry));
677        });
678    }
679
680    #[gpui::test]
681    async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
682        init_test(cx);
683        let http = FakeHttpClient::with_200_response();
684        let installation_id = Some("installation_id".to_string());
685        let session_id = "session_id".to_string();
686
687        cx.update(|cx| {
688            let telemetry = Telemetry::new(http, cx);
689            telemetry.state.lock().max_queue_size = 4;
690            telemetry.start(installation_id, session_id, cx);
691
692            assert!(is_empty_state(&telemetry));
693
694            let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
695            let operation = "test".to_string();
696
697            let event =
698                telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
699            assert_eq!(
700                event,
701                Event::App {
702                    operation: operation.clone(),
703                    milliseconds_since_first_event: 0
704                }
705            );
706            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
707            assert!(telemetry.state.lock().flush_events_task.is_some());
708            assert_eq!(
709                telemetry.state.lock().first_event_date_time,
710                Some(first_date_time)
711            );
712
713            let duration = Duration::from_millis(1);
714
715            // Test 1 millisecond before the flush interval limit is met
716            executor.advance_clock(FLUSH_INTERVAL - duration);
717
718            assert!(!is_empty_state(&telemetry));
719
720            // Test the exact moment the flush interval limit is met
721            executor.advance_clock(duration);
722
723            assert!(is_empty_state(&telemetry));
724        });
725    }
726
727    // TODO:
728    // Test settings
729    // Update FakeHTTPClient to keep track of the number of requests and assert on it
730
731    fn init_test(cx: &mut TestAppContext) {
732        cx.update(|cx| {
733            let settings_store = SettingsStore::test(cx);
734            cx.set_global(settings_store);
735        });
736    }
737
738    fn is_empty_state(telemetry: &Telemetry) -> bool {
739        telemetry.state.lock().events_queue.is_empty()
740            && telemetry.state.lock().flush_events_task.is_none()
741            && telemetry.state.lock().first_event_date_time.is_none()
742    }
743}