telemetry.rs

  1mod event_coalescer;
  2
  3use crate::TelemetrySettings;
  4use chrono::{DateTime, Utc};
  5use futures::Future;
  6use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
  7use once_cell::sync::Lazy;
  8use parking_lot::Mutex;
  9use release_channel::ReleaseChannel;
 10use serde::Serialize;
 11use settings::{Settings, SettingsStore};
 12use sha2::{Digest, Sha256};
 13use std::io::Write;
 14use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 15use sysinfo::{
 16    CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
 17};
 18use tempfile::NamedTempFile;
 19use util::http::{self, HttpClient, Method, ZedHttpClient};
 20#[cfg(not(debug_assertions))]
 21use util::ResultExt;
 22use util::TryFutureExt;
 23
 24use self::event_coalescer::EventCoalescer;
 25
 26pub struct Telemetry {
 27    http_client: Arc<ZedHttpClient>,
 28    executor: BackgroundExecutor,
 29    state: Arc<Mutex<TelemetryState>>,
 30}
 31
 32struct TelemetryState {
 33    settings: TelemetrySettings,
 34    metrics_id: Option<Arc<str>>,      // Per logged-in user
 35    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 36    session_id: Option<Arc<str>>,      // Per app launch
 37    release_channel: Option<&'static str>,
 38    app_metadata: AppMetadata,
 39    architecture: &'static str,
 40    events_queue: Vec<EventWrapper>,
 41    flush_events_task: Option<Task<()>>,
 42    log_file: Option<NamedTempFile>,
 43    is_staff: Option<bool>,
 44    first_event_date_time: Option<DateTime<Utc>>,
 45    event_coalescer: EventCoalescer,
 46    max_queue_size: usize,
 47}
 48
 49#[derive(Serialize, Debug)]
 50struct EventRequestBody {
 51    installation_id: Option<Arc<str>>,
 52    session_id: Option<Arc<str>>,
 53    is_staff: Option<bool>,
 54    app_version: Option<String>,
 55    os_name: &'static str,
 56    os_version: Option<String>,
 57    architecture: &'static str,
 58    release_channel: Option<&'static str>,
 59    events: Vec<EventWrapper>,
 60}
 61
 62#[derive(Serialize, Debug)]
 63struct EventWrapper {
 64    signed_in: bool,
 65    #[serde(flatten)]
 66    event: Event,
 67}
 68
 69#[derive(Clone, Debug, PartialEq, Serialize)]
 70#[serde(rename_all = "snake_case")]
 71pub enum AssistantKind {
 72    Panel,
 73    Inline,
 74}
 75
 76#[derive(Clone, Debug, PartialEq, Serialize)]
 77#[serde(tag = "type")]
 78pub enum Event {
 79    Editor {
 80        operation: &'static str,
 81        file_extension: Option<String>,
 82        vim_mode: bool,
 83        copilot_enabled: bool,
 84        copilot_enabled_for_language: bool,
 85        milliseconds_since_first_event: i64,
 86    },
 87    Copilot {
 88        suggestion_id: Option<String>,
 89        suggestion_accepted: bool,
 90        file_extension: Option<String>,
 91        milliseconds_since_first_event: i64,
 92    },
 93    Call {
 94        operation: &'static str,
 95        room_id: Option<u64>,
 96        channel_id: Option<u64>,
 97        milliseconds_since_first_event: i64,
 98    },
 99    Assistant {
100        conversation_id: Option<String>,
101        kind: AssistantKind,
102        model: &'static str,
103        milliseconds_since_first_event: i64,
104    },
105    Cpu {
106        usage_as_percentage: f32,
107        core_count: u32,
108        milliseconds_since_first_event: i64,
109    },
110    Memory {
111        memory_in_bytes: u64,
112        virtual_memory_in_bytes: u64,
113        milliseconds_since_first_event: i64,
114    },
115    App {
116        operation: String,
117        milliseconds_since_first_event: i64,
118    },
119    Setting {
120        setting: &'static str,
121        value: String,
122        milliseconds_since_first_event: i64,
123    },
124    Edit {
125        duration: i64,
126        environment: &'static str,
127        milliseconds_since_first_event: i64,
128    },
129    Action {
130        source: &'static str,
131        action: String,
132        milliseconds_since_first_event: i64,
133    },
134}
135
136#[cfg(debug_assertions)]
137const MAX_QUEUE_LEN: usize = 5;
138
139#[cfg(not(debug_assertions))]
140const MAX_QUEUE_LEN: usize = 50;
141
142#[cfg(debug_assertions)]
143const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
144
145#[cfg(not(debug_assertions))]
146const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
147
148static ZED_CLIENT_CHECKSUM_SEED: Lazy<Vec<u8>> = Lazy::new(|| {
149    option_env!("ZED_CLIENT_CHECKSUM_SEED")
150        .unwrap_or("development-checksum-seed")
151        .as_bytes()
152        .into()
153});
154
155impl Telemetry {
156    pub fn new(client: Arc<ZedHttpClient>, cx: &mut AppContext) -> Arc<Self> {
157        let release_channel =
158            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
159
160        TelemetrySettings::register(cx);
161
162        let state = Arc::new(Mutex::new(TelemetryState {
163            settings: TelemetrySettings::get_global(cx).clone(),
164            app_metadata: cx.app_metadata(),
165            architecture: env::consts::ARCH,
166            release_channel,
167            installation_id: None,
168            metrics_id: None,
169            session_id: None,
170            events_queue: Vec::new(),
171            flush_events_task: None,
172            log_file: None,
173            is_staff: None,
174            first_event_date_time: None,
175            event_coalescer: EventCoalescer::new(),
176            max_queue_size: MAX_QUEUE_LEN,
177        }));
178
179        #[cfg(not(debug_assertions))]
180        cx.background_executor()
181            .spawn({
182                let state = state.clone();
183                async move {
184                    if let Some(tempfile) =
185                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
186                    {
187                        state.lock().log_file = Some(tempfile);
188                    }
189                }
190            })
191            .detach();
192
193        cx.observe_global::<SettingsStore>({
194            let state = state.clone();
195
196            move |cx| {
197                let mut state = state.lock();
198                state.settings = TelemetrySettings::get_global(cx).clone();
199            }
200        })
201        .detach();
202
203        // TODO: Replace all hardware stuff with nested SystemSpecs json
204        let this = Arc::new(Self {
205            http_client: client,
206            executor: cx.background_executor().clone(),
207            state,
208        });
209
210        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
211        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
212        std::mem::forget(cx.on_app_quit({
213            let this = this.clone();
214            move |_| this.shutdown_telemetry()
215        }));
216
217        this
218    }
219
220    #[cfg(any(test, feature = "test-support"))]
221    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
222        Task::ready(())
223    }
224
225    // Skip calling this function in tests.
226    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
227    #[cfg(not(any(test, feature = "test-support")))]
228    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
229        self.report_app_event("close".to_string());
230        // TODO: close final edit period and make sure it's sent
231        Task::ready(())
232    }
233
234    pub fn log_file_path(&self) -> Option<PathBuf> {
235        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
236    }
237
238    pub fn start(
239        self: &Arc<Self>,
240        installation_id: Option<String>,
241        session_id: String,
242        cx: &mut AppContext,
243    ) {
244        let mut state = self.state.lock();
245        state.installation_id = installation_id.map(|id| id.into());
246        state.session_id = Some(session_id.into());
247        drop(state);
248
249        let this = self.clone();
250        cx.spawn(|_| async move {
251            // Avoiding calling `System::new_all()`, as there have been crashes related to it
252            let refresh_kind = RefreshKind::new()
253                .with_memory() // For memory usage
254                .with_processes(ProcessRefreshKind::everything()) // For process usage
255                .with_cpu(CpuRefreshKind::everything()); // For core count
256
257            let mut system = System::new_with_specifics(refresh_kind);
258
259            // Avoiding calling `refresh_all()`, just update what we need
260            system.refresh_specifics(refresh_kind);
261
262            // Waiting some amount of time before the first query is important to get a reasonable value
263            // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
264            const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
265
266            loop {
267                smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
268
269                system.refresh_specifics(refresh_kind);
270
271                let current_process = Pid::from_u32(std::process::id());
272                let Some(process) = system.processes().get(&current_process) else {
273                    let process = current_process;
274                    log::error!("Failed to find own process {process:?} in system process table");
275                    // TODO: Fire an error telemetry event
276                    return;
277                };
278
279                this.report_memory_event(process.memory(), process.virtual_memory());
280                this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
281            }
282        })
283        .detach();
284    }
285
286    pub fn set_authenticated_user_info(
287        self: &Arc<Self>,
288        metrics_id: Option<String>,
289        is_staff: bool,
290    ) {
291        let mut state = self.state.lock();
292
293        if !state.settings.metrics {
294            return;
295        }
296
297        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
298        state.metrics_id = metrics_id.clone();
299        state.is_staff = Some(is_staff);
300        drop(state);
301    }
302
303    pub fn report_editor_event(
304        self: &Arc<Self>,
305        file_extension: Option<String>,
306        vim_mode: bool,
307        operation: &'static str,
308        copilot_enabled: bool,
309        copilot_enabled_for_language: bool,
310    ) {
311        let event = Event::Editor {
312            file_extension,
313            vim_mode,
314            operation,
315            copilot_enabled,
316            copilot_enabled_for_language,
317            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
318        };
319
320        self.report_event(event)
321    }
322
323    pub fn report_copilot_event(
324        self: &Arc<Self>,
325        suggestion_id: Option<String>,
326        suggestion_accepted: bool,
327        file_extension: Option<String>,
328    ) {
329        let event = Event::Copilot {
330            suggestion_id,
331            suggestion_accepted,
332            file_extension,
333            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
334        };
335
336        self.report_event(event)
337    }
338
339    pub fn report_assistant_event(
340        self: &Arc<Self>,
341        conversation_id: Option<String>,
342        kind: AssistantKind,
343        model: &'static str,
344    ) {
345        let event = Event::Assistant {
346            conversation_id,
347            kind,
348            model,
349            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
350        };
351
352        self.report_event(event)
353    }
354
355    pub fn report_call_event(
356        self: &Arc<Self>,
357        operation: &'static str,
358        room_id: Option<u64>,
359        channel_id: Option<u64>,
360    ) {
361        let event = Event::Call {
362            operation,
363            room_id,
364            channel_id,
365            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
366        };
367
368        self.report_event(event)
369    }
370
371    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
372        let event = Event::Cpu {
373            usage_as_percentage,
374            core_count,
375            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
376        };
377
378        self.report_event(event)
379    }
380
381    pub fn report_memory_event(
382        self: &Arc<Self>,
383        memory_in_bytes: u64,
384        virtual_memory_in_bytes: u64,
385    ) {
386        let event = Event::Memory {
387            memory_in_bytes,
388            virtual_memory_in_bytes,
389            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
390        };
391
392        self.report_event(event)
393    }
394
395    pub fn report_app_event(self: &Arc<Self>, operation: String) {
396        self.report_app_event_with_date_time(operation, Utc::now());
397    }
398
399    fn report_app_event_with_date_time(
400        self: &Arc<Self>,
401        operation: String,
402        date_time: DateTime<Utc>,
403    ) -> Event {
404        let event = Event::App {
405            operation,
406            milliseconds_since_first_event: self.milliseconds_since_first_event(date_time),
407        };
408
409        self.report_event(event.clone());
410
411        event
412    }
413
414    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
415        let event = Event::Setting {
416            setting,
417            value,
418            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
419        };
420
421        self.report_event(event)
422    }
423
424    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
425        let mut state = self.state.lock();
426        let period_data = state.event_coalescer.log_event(environment);
427        drop(state);
428
429        if let Some((start, end, environment)) = period_data {
430            let event = Event::Edit {
431                duration: end.timestamp_millis() - start.timestamp_millis(),
432                environment,
433                milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
434            };
435
436            self.report_event(event);
437        }
438    }
439
440    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
441        let event = Event::Action {
442            source,
443            action,
444            milliseconds_since_first_event: self.milliseconds_since_first_event(Utc::now()),
445        };
446
447        self.report_event(event)
448    }
449
450    fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
451        let mut state = self.state.lock();
452
453        match state.first_event_date_time {
454            Some(first_event_date_time) => {
455                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
456            }
457            None => {
458                state.first_event_date_time = Some(date_time);
459                0
460            }
461        }
462    }
463
464    fn report_event(self: &Arc<Self>, event: Event) {
465        let mut state = self.state.lock();
466
467        if !state.settings.metrics {
468            return;
469        }
470
471        if state.flush_events_task.is_none() {
472            let this = self.clone();
473            let executor = self.executor.clone();
474            state.flush_events_task = Some(self.executor.spawn(async move {
475                executor.timer(FLUSH_INTERVAL).await;
476                this.flush_events();
477            }));
478        }
479
480        let signed_in = state.metrics_id.is_some();
481        state.events_queue.push(EventWrapper { signed_in, event });
482
483        if state.installation_id.is_some() {
484            if state.events_queue.len() >= state.max_queue_size {
485                drop(state);
486                self.flush_events();
487            }
488        }
489    }
490
491    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
492        self.state.lock().metrics_id.clone()
493    }
494
495    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
496        self.state.lock().installation_id.clone()
497    }
498
499    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
500        self.state.lock().is_staff
501    }
502
503    pub fn flush_events(self: &Arc<Self>) {
504        let mut state = self.state.lock();
505        state.first_event_date_time = None;
506        let mut events = mem::take(&mut state.events_queue);
507        state.flush_events_task.take();
508        drop(state);
509        if events.is_empty() {
510            return;
511        }
512
513        let this = self.clone();
514        self.executor
515            .spawn(
516                async move {
517                    let mut json_bytes = Vec::new();
518
519                    if let Some(file) = &mut this.state.lock().log_file {
520                        let file = file.as_file_mut();
521                        for event in &mut events {
522                            json_bytes.clear();
523                            serde_json::to_writer(&mut json_bytes, event)?;
524                            file.write_all(&json_bytes)?;
525                            file.write(b"\n")?;
526                        }
527                    }
528
529                    {
530                        let state = this.state.lock();
531                        let request_body = EventRequestBody {
532                            installation_id: state.installation_id.clone(),
533                            session_id: state.session_id.clone(),
534                            is_staff: state.is_staff.clone(),
535                            app_version: state
536                                .app_metadata
537                                .app_version
538                                .map(|version| version.to_string()),
539                            os_name: state.app_metadata.os_name,
540                            os_version: state
541                                .app_metadata
542                                .os_version
543                                .map(|version| version.to_string()),
544                            architecture: state.architecture,
545
546                            release_channel: state.release_channel,
547                            events,
548                        };
549                        json_bytes.clear();
550                        serde_json::to_writer(&mut json_bytes, &request_body)?;
551                    }
552
553                    let mut summer = Sha256::new();
554                    summer.update(&*ZED_CLIENT_CHECKSUM_SEED);
555                    summer.update(&json_bytes);
556                    summer.update(&*ZED_CLIENT_CHECKSUM_SEED);
557                    let mut checksum = String::new();
558                    for byte in summer.finalize().as_slice() {
559                        use std::fmt::Write;
560                        write!(&mut checksum, "{:02x}", byte).unwrap();
561                    }
562
563                    let request = http::Request::builder()
564                        .method(Method::POST)
565                        .uri(&this.http_client.zed_url("/api/events"))
566                        .header("Content-Type", "text/plain")
567                        .header("x-zed-checksum", checksum)
568                        .body(json_bytes.into());
569
570                    let response = this.http_client.send(request?).await?;
571                    if response.status() != 200 {
572                        log::error!("Failed to send events: HTTP {:?}", response.status());
573                    }
574                    anyhow::Ok(())
575                }
576                .log_err(),
577            )
578            .detach();
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use chrono::TimeZone;
586    use gpui::TestAppContext;
587    use util::http::FakeHttpClient;
588
589    #[gpui::test]
590    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
591        init_test(cx);
592        let http = FakeHttpClient::with_200_response();
593        let installation_id = Some("installation_id".to_string());
594        let session_id = "session_id".to_string();
595
596        cx.update(|cx| {
597            let telemetry = Telemetry::new(http, cx);
598
599            telemetry.state.lock().max_queue_size = 4;
600            telemetry.start(installation_id, session_id, cx);
601
602            assert!(is_empty_state(&telemetry));
603
604            let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
605            let operation = "test".to_string();
606
607            let event =
608                telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
609            assert_eq!(
610                event,
611                Event::App {
612                    operation: operation.clone(),
613                    milliseconds_since_first_event: 0
614                }
615            );
616            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
617            assert!(telemetry.state.lock().flush_events_task.is_some());
618            assert_eq!(
619                telemetry.state.lock().first_event_date_time,
620                Some(first_date_time)
621            );
622
623            let mut date_time = first_date_time + chrono::Duration::milliseconds(100);
624
625            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
626            assert_eq!(
627                event,
628                Event::App {
629                    operation: operation.clone(),
630                    milliseconds_since_first_event: 100
631                }
632            );
633            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
634            assert!(telemetry.state.lock().flush_events_task.is_some());
635            assert_eq!(
636                telemetry.state.lock().first_event_date_time,
637                Some(first_date_time)
638            );
639
640            date_time += chrono::Duration::milliseconds(100);
641
642            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
643            assert_eq!(
644                event,
645                Event::App {
646                    operation: operation.clone(),
647                    milliseconds_since_first_event: 200
648                }
649            );
650            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
651            assert!(telemetry.state.lock().flush_events_task.is_some());
652            assert_eq!(
653                telemetry.state.lock().first_event_date_time,
654                Some(first_date_time)
655            );
656
657            date_time += chrono::Duration::milliseconds(100);
658
659            // Adding a 4th event should cause a flush
660            let event = telemetry.report_app_event_with_date_time(operation.clone(), date_time);
661            assert_eq!(
662                event,
663                Event::App {
664                    operation: operation.clone(),
665                    milliseconds_since_first_event: 300
666                }
667            );
668
669            assert!(is_empty_state(&telemetry));
670        });
671    }
672
673    #[gpui::test]
674    async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
675        init_test(cx);
676        let http = FakeHttpClient::with_200_response();
677        let installation_id = Some("installation_id".to_string());
678        let session_id = "session_id".to_string();
679
680        cx.update(|cx| {
681            let telemetry = Telemetry::new(http, cx);
682            telemetry.state.lock().max_queue_size = 4;
683            telemetry.start(installation_id, session_id, cx);
684
685            assert!(is_empty_state(&telemetry));
686
687            let first_date_time = Utc.with_ymd_and_hms(1990, 4, 12, 12, 0, 0).unwrap();
688            let operation = "test".to_string();
689
690            let event =
691                telemetry.report_app_event_with_date_time(operation.clone(), first_date_time);
692            assert_eq!(
693                event,
694                Event::App {
695                    operation: operation.clone(),
696                    milliseconds_since_first_event: 0
697                }
698            );
699            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
700            assert!(telemetry.state.lock().flush_events_task.is_some());
701            assert_eq!(
702                telemetry.state.lock().first_event_date_time,
703                Some(first_date_time)
704            );
705
706            let duration = Duration::from_millis(1);
707
708            // Test 1 millisecond before the flush interval limit is met
709            executor.advance_clock(FLUSH_INTERVAL - duration);
710
711            assert!(!is_empty_state(&telemetry));
712
713            // Test the exact moment the flush interval limit is met
714            executor.advance_clock(duration);
715
716            assert!(is_empty_state(&telemetry));
717        });
718    }
719
720    // TODO:
721    // Test settings
722    // Update FakeHTTPClient to keep track of the number of requests and assert on it
723
724    fn init_test(cx: &mut TestAppContext) {
725        cx.update(|cx| {
726            let settings_store = SettingsStore::test(cx);
727            cx.set_global(settings_store);
728        });
729    }
730
731    fn is_empty_state(telemetry: &Telemetry) -> bool {
732        telemetry.state.lock().events_queue.is_empty()
733            && telemetry.state.lock().flush_events_task.is_none()
734            && telemetry.state.lock().first_event_date_time.is_none()
735    }
736}