telemetry.rs

  1mod event_coalescer;
  2
  3use crate::{ChannelId, TelemetrySettings};
  4use anyhow::Result;
  5use clock::SystemClock;
  6use collections::{HashMap, HashSet};
  7use futures::channel::mpsc;
  8use futures::{Future, StreamExt};
  9use gpui::{AppContext, BackgroundExecutor, Task};
 10use http_client::{self, AsyncBody, HttpClient, HttpClientWithUrl, Method, Request};
 11use once_cell::sync::Lazy;
 12use parking_lot::Mutex;
 13use release_channel::ReleaseChannel;
 14use settings::{Settings, SettingsStore};
 15use sha2::{Digest, Sha256};
 16use std::fs::File;
 17use std::io::Write;
 18use std::time::Instant;
 19use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 20use telemetry_events::{
 21    AppEvent, AssistantEvent, CallEvent, EditEvent, Event, EventRequestBody, EventWrapper,
 22    InlineCompletionEvent,
 23};
 24use util::{ResultExt, TryFutureExt};
 25use worktree::{UpdatedEntriesSet, WorktreeId};
 26
 27use self::event_coalescer::EventCoalescer;
 28
 29pub struct Telemetry {
 30    clock: Arc<dyn SystemClock>,
 31    http_client: Arc<HttpClientWithUrl>,
 32    executor: BackgroundExecutor,
 33    state: Arc<Mutex<TelemetryState>>,
 34}
 35
 36struct TelemetryState {
 37    settings: TelemetrySettings,
 38    system_id: Option<Arc<str>>,       // Per system
 39    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 40    session_id: Option<String>,        // Per app launch
 41    metrics_id: Option<Arc<str>>,      // Per logged-in user
 42    release_channel: Option<&'static str>,
 43    architecture: &'static str,
 44    events_queue: Vec<EventWrapper>,
 45    flush_events_task: Option<Task<()>>,
 46    log_file: Option<File>,
 47    is_staff: Option<bool>,
 48    first_event_date_time: Option<Instant>,
 49    event_coalescer: EventCoalescer,
 50    max_queue_size: usize,
 51    worktree_id_map: WorktreeIdMap,
 52
 53    os_name: String,
 54    app_version: String,
 55    os_version: Option<String>,
 56}
 57
 58#[derive(Debug)]
 59struct WorktreeIdMap(HashMap<String, ProjectCache>);
 60
 61#[derive(Debug)]
 62struct ProjectCache {
 63    name: String,
 64    worktree_ids_reported: HashSet<WorktreeId>,
 65}
 66
 67impl ProjectCache {
 68    fn new(name: String) -> Self {
 69        Self {
 70            name,
 71            worktree_ids_reported: HashSet::default(),
 72        }
 73    }
 74}
 75
 76#[cfg(debug_assertions)]
 77const MAX_QUEUE_LEN: usize = 5;
 78
 79#[cfg(not(debug_assertions))]
 80const MAX_QUEUE_LEN: usize = 50;
 81
 82#[cfg(debug_assertions)]
 83const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
 84
 85#[cfg(not(debug_assertions))]
 86const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
 87static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
 88    option_env!("ZED_CLIENT_CHECKSUM_SEED")
 89        .map(|s| s.as_bytes().into())
 90        .or_else(|| {
 91            env::var("ZED_CLIENT_CHECKSUM_SEED")
 92                .ok()
 93                .map(|s| s.as_bytes().into())
 94        })
 95});
 96
 97pub fn os_name() -> String {
 98    #[cfg(target_os = "macos")]
 99    {
100        "macOS".to_string()
101    }
102    #[cfg(any(target_os = "linux", target_os = "freebsd"))]
103    {
104        format!("Linux {}", gpui::guess_compositor())
105    }
106
107    #[cfg(target_os = "windows")]
108    {
109        "Windows".to_string()
110    }
111}
112
113/// Note: This might do blocking IO! Only call from background threads
114pub fn os_version() -> String {
115    #[cfg(target_os = "macos")]
116    {
117        use cocoa::base::nil;
118        use cocoa::foundation::NSProcessInfo;
119
120        unsafe {
121            let process_info = cocoa::foundation::NSProcessInfo::processInfo(nil);
122            let version = process_info.operatingSystemVersion();
123            gpui::SemanticVersion::new(
124                version.majorVersion as usize,
125                version.minorVersion as usize,
126                version.patchVersion as usize,
127            )
128            .to_string()
129        }
130    }
131    #[cfg(any(target_os = "linux", target_os = "freebsd"))]
132    {
133        use std::path::Path;
134
135        let content = if let Ok(file) = std::fs::read_to_string(&Path::new("/etc/os-release")) {
136            file
137        } else if let Ok(file) = std::fs::read_to_string(&Path::new("/usr/lib/os-release")) {
138            file
139        } else {
140            log::error!("Failed to load /etc/os-release, /usr/lib/os-release");
141            "".to_string()
142        };
143        let mut name = "unknown".to_string();
144        let mut version = "unknown".to_string();
145
146        for line in content.lines() {
147            if line.starts_with("ID=") {
148                name = line.trim_start_matches("ID=").trim_matches('"').to_string();
149            }
150            if line.starts_with("VERSION_ID=") {
151                version = line
152                    .trim_start_matches("VERSION_ID=")
153                    .trim_matches('"')
154                    .to_string();
155            }
156        }
157
158        format!("{} {}", name, version)
159    }
160
161    #[cfg(target_os = "windows")]
162    {
163        let mut info = unsafe { std::mem::zeroed() };
164        let status = unsafe { windows::Wdk::System::SystemServices::RtlGetVersion(&mut info) };
165        if status.is_ok() {
166            gpui::SemanticVersion::new(
167                info.dwMajorVersion as _,
168                info.dwMinorVersion as _,
169                info.dwBuildNumber as _,
170            )
171            .to_string()
172        } else {
173            "unknown".to_string()
174        }
175    }
176}
177
178impl Telemetry {
179    pub fn new(
180        clock: Arc<dyn SystemClock>,
181        client: Arc<HttpClientWithUrl>,
182        cx: &mut AppContext,
183    ) -> Arc<Self> {
184        let release_channel =
185            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
186
187        TelemetrySettings::register(cx);
188
189        let state = Arc::new(Mutex::new(TelemetryState {
190            settings: *TelemetrySettings::get_global(cx),
191            architecture: env::consts::ARCH,
192            release_channel,
193            system_id: None,
194            installation_id: None,
195            session_id: None,
196            metrics_id: None,
197            events_queue: Vec::new(),
198            flush_events_task: None,
199            log_file: None,
200            is_staff: None,
201            first_event_date_time: None,
202            event_coalescer: EventCoalescer::new(clock.clone()),
203            max_queue_size: MAX_QUEUE_LEN,
204            worktree_id_map: WorktreeIdMap(HashMap::from_iter([
205                (
206                    "pnpm-lock.yaml".to_string(),
207                    ProjectCache::new("pnpm".to_string()),
208                ),
209                (
210                    "yarn.lock".to_string(),
211                    ProjectCache::new("yarn".to_string()),
212                ),
213                (
214                    "package.json".to_string(),
215                    ProjectCache::new("node".to_string()),
216                ),
217            ])),
218
219            os_version: None,
220            os_name: os_name(),
221            app_version: release_channel::AppVersion::global(cx).to_string(),
222        }));
223        Self::log_file_path();
224
225        cx.background_executor()
226            .spawn({
227                let state = state.clone();
228                let os_version = os_version();
229                state.lock().os_version = Some(os_version.clone());
230                async move {
231                    if let Some(tempfile) = File::create(Self::log_file_path()).log_err() {
232                        state.lock().log_file = Some(tempfile);
233                    }
234                }
235            })
236            .detach();
237
238        cx.observe_global::<SettingsStore>({
239            let state = state.clone();
240
241            move |cx| {
242                let mut state = state.lock();
243                state.settings = *TelemetrySettings::get_global(cx);
244            }
245        })
246        .detach();
247
248        let this = Arc::new(Self {
249            clock,
250            http_client: client,
251            executor: cx.background_executor().clone(),
252            state,
253        });
254
255        let (tx, mut rx) = mpsc::unbounded();
256        ::telemetry::init(tx);
257
258        cx.background_executor()
259            .spawn({
260                let this = Arc::downgrade(&this);
261                async move {
262                    while let Some(event) = rx.next().await {
263                        let Some(state) = this.upgrade() else { break };
264                        state.report_event(Event::Flexible(event))
265                    }
266                }
267            })
268            .detach();
269
270        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
271        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
272        std::mem::forget(cx.on_app_quit({
273            let this = this.clone();
274            move |_| this.shutdown_telemetry()
275        }));
276
277        this
278    }
279
280    #[cfg(any(test, feature = "test-support"))]
281    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
282        Task::ready(())
283    }
284
285    // Skip calling this function in tests.
286    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
287    #[cfg(not(any(test, feature = "test-support")))]
288    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
289        self.report_app_event("close".to_string());
290        // TODO: close final edit period and make sure it's sent
291        Task::ready(())
292    }
293
294    pub fn log_file_path() -> PathBuf {
295        paths::logs_dir().join("telemetry.log")
296    }
297
298    pub fn start(
299        self: &Arc<Self>,
300        system_id: Option<String>,
301        installation_id: Option<String>,
302        session_id: String,
303        cx: &AppContext,
304    ) {
305        let mut state = self.state.lock();
306        state.system_id = system_id.map(|id| id.into());
307        state.installation_id = installation_id.map(|id| id.into());
308        state.session_id = Some(session_id);
309        state.app_version = release_channel::AppVersion::global(cx).to_string();
310        state.os_name = os_name();
311    }
312
313    pub fn metrics_enabled(self: &Arc<Self>) -> bool {
314        let state = self.state.lock();
315        let enabled = state.settings.metrics;
316        drop(state);
317        enabled
318    }
319
320    pub fn set_authenticated_user_info(
321        self: &Arc<Self>,
322        metrics_id: Option<String>,
323        is_staff: bool,
324    ) {
325        let mut state = self.state.lock();
326
327        if !state.settings.metrics {
328            return;
329        }
330
331        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
332        state.metrics_id.clone_from(&metrics_id);
333        state.is_staff = Some(is_staff);
334        drop(state);
335    }
336
337    pub fn report_inline_completion_event(
338        self: &Arc<Self>,
339        provider: String,
340        suggestion_accepted: bool,
341        file_extension: Option<String>,
342    ) {
343        let event = Event::InlineCompletion(InlineCompletionEvent {
344            provider,
345            suggestion_accepted,
346            file_extension,
347        });
348
349        self.report_event(event)
350    }
351
352    pub fn report_assistant_event(self: &Arc<Self>, event: AssistantEvent) {
353        self.report_event(Event::Assistant(event));
354    }
355
356    pub fn report_call_event(
357        self: &Arc<Self>,
358        operation: &'static str,
359        room_id: Option<u64>,
360        channel_id: Option<ChannelId>,
361    ) {
362        let event = Event::Call(CallEvent {
363            operation: operation.to_string(),
364            room_id,
365            channel_id: channel_id.map(|cid| cid.0),
366        });
367
368        self.report_event(event)
369    }
370
371    pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
372        let event = Event::App(AppEvent { operation });
373
374        self.report_event(event.clone());
375
376        event
377    }
378
379    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str, is_via_ssh: bool) {
380        let mut state = self.state.lock();
381        let period_data = state.event_coalescer.log_event(environment);
382        drop(state);
383
384        if let Some((start, end, environment)) = period_data {
385            let event = Event::Edit(EditEvent {
386                duration: end
387                    .saturating_duration_since(start)
388                    .min(Duration::from_secs(60 * 60 * 24))
389                    .as_millis() as i64,
390                environment: environment.to_string(),
391                is_via_ssh,
392            });
393
394            self.report_event(event);
395        }
396    }
397
398    pub fn report_discovered_project_events(
399        self: &Arc<Self>,
400        worktree_id: WorktreeId,
401        updated_entries_set: &UpdatedEntriesSet,
402    ) {
403        let project_type_names: Vec<String> = {
404            let mut state = self.state.lock();
405            state
406                .worktree_id_map
407                .0
408                .iter_mut()
409                .filter_map(|(project_file_name, project_type_telemetry)| {
410                    if project_type_telemetry
411                        .worktree_ids_reported
412                        .contains(&worktree_id)
413                    {
414                        return None;
415                    }
416
417                    let project_file_found = updated_entries_set.iter().any(|(path, _, _)| {
418                        path.as_ref()
419                            .file_name()
420                            .and_then(|name| name.to_str())
421                            .map(|name_str| name_str == project_file_name)
422                            .unwrap_or(false)
423                    });
424
425                    if !project_file_found {
426                        return None;
427                    }
428
429                    project_type_telemetry
430                        .worktree_ids_reported
431                        .insert(worktree_id);
432
433                    Some(project_type_telemetry.name.clone())
434                })
435                .collect()
436        };
437
438        // Done on purpose to avoid calling `self.state.lock()` multiple times
439        for project_type_name in project_type_names {
440            self.report_app_event(format!("open {} project", project_type_name));
441        }
442    }
443
444    fn report_event(self: &Arc<Self>, event: Event) {
445        let mut state = self.state.lock();
446
447        if !state.settings.metrics {
448            return;
449        }
450
451        if state.flush_events_task.is_none() {
452            let this = self.clone();
453            let executor = self.executor.clone();
454            state.flush_events_task = Some(self.executor.spawn(async move {
455                executor.timer(FLUSH_INTERVAL).await;
456                this.flush_events();
457            }));
458        }
459
460        let date_time = self.clock.utc_now();
461
462        let milliseconds_since_first_event = match state.first_event_date_time {
463            Some(first_event_date_time) => date_time
464                .saturating_duration_since(first_event_date_time)
465                .min(Duration::from_secs(60 * 60 * 24))
466                .as_millis() as i64,
467            None => {
468                state.first_event_date_time = Some(date_time);
469                0
470            }
471        };
472
473        let signed_in = state.metrics_id.is_some();
474        state.events_queue.push(EventWrapper {
475            signed_in,
476            milliseconds_since_first_event,
477            event,
478        });
479
480        if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
481            drop(state);
482            self.flush_events();
483        }
484    }
485
486    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
487        self.state.lock().metrics_id.clone()
488    }
489
490    pub fn system_id(self: &Arc<Self>) -> Option<Arc<str>> {
491        self.state.lock().system_id.clone()
492    }
493
494    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
495        self.state.lock().installation_id.clone()
496    }
497
498    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
499        self.state.lock().is_staff
500    }
501
502    fn build_request(
503        self: &Arc<Self>,
504        // We take in the JSON bytes buffer so we can reuse the existing allocation.
505        mut json_bytes: Vec<u8>,
506        event_request: EventRequestBody,
507    ) -> Result<Request<AsyncBody>> {
508        json_bytes.clear();
509        serde_json::to_writer(&mut json_bytes, &event_request)?;
510
511        let checksum = calculate_json_checksum(&json_bytes).unwrap_or("".to_string());
512
513        Ok(Request::builder()
514            .method(Method::POST)
515            .uri(
516                self.http_client
517                    .build_zed_api_url("/telemetry/events", &[])?
518                    .as_ref(),
519            )
520            .header("Content-Type", "application/json")
521            .header("x-zed-checksum", checksum)
522            .body(json_bytes.into())?)
523    }
524
525    pub fn flush_events(self: &Arc<Self>) {
526        let mut state = self.state.lock();
527        state.first_event_date_time = None;
528        let mut events = mem::take(&mut state.events_queue);
529        state.flush_events_task.take();
530        drop(state);
531        if events.is_empty() {
532            return;
533        }
534
535        let this = self.clone();
536        self.executor
537            .spawn(
538                async move {
539                    let mut json_bytes = Vec::new();
540
541                    if let Some(file) = &mut this.state.lock().log_file {
542                        for event in &mut events {
543                            json_bytes.clear();
544                            serde_json::to_writer(&mut json_bytes, event)?;
545                            file.write_all(&json_bytes)?;
546                            file.write_all(b"\n")?;
547                        }
548                    }
549
550                    let request_body = {
551                        let state = this.state.lock();
552
553                        EventRequestBody {
554                            system_id: state.system_id.as_deref().map(Into::into),
555                            installation_id: state.installation_id.as_deref().map(Into::into),
556                            session_id: state.session_id.clone(),
557                            metrics_id: state.metrics_id.as_deref().map(Into::into),
558                            is_staff: state.is_staff,
559                            app_version: state.app_version.clone(),
560                            os_name: state.os_name.clone(),
561                            os_version: state.os_version.clone(),
562                            architecture: state.architecture.to_string(),
563
564                            release_channel: state.release_channel.map(Into::into),
565                            events,
566                        }
567                    };
568
569                    let request = this.build_request(json_bytes, request_body)?;
570                    let response = this.http_client.send(request).await?;
571                    if response.status() != 200 {
572                        log::error!("Failed to send events: HTTP {:?}", response.status());
573                    }
574                    anyhow::Ok(())
575                }
576                .log_err(),
577            )
578            .detach();
579    }
580}
581
582pub fn calculate_json_checksum(json: &impl AsRef<[u8]>) -> Option<String> {
583    let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
584        return None;
585    };
586
587    let mut summer = Sha256::new();
588    summer.update(checksum_seed);
589    summer.update(json);
590    summer.update(checksum_seed);
591    let mut checksum = String::new();
592    for byte in summer.finalize().as_slice() {
593        use std::fmt::Write;
594        write!(&mut checksum, "{:02x}", byte).unwrap();
595    }
596
597    Some(checksum)
598}
599
600#[cfg(test)]
601mod tests {
602    use super::*;
603    use clock::FakeSystemClock;
604    use gpui::TestAppContext;
605    use http_client::FakeHttpClient;
606
607    #[gpui::test]
608    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
609        init_test(cx);
610        let clock = Arc::new(FakeSystemClock::new());
611        let http = FakeHttpClient::with_200_response();
612        let system_id = Some("system_id".to_string());
613        let installation_id = Some("installation_id".to_string());
614        let session_id = "session_id".to_string();
615
616        cx.update(|cx| {
617            let telemetry = Telemetry::new(clock.clone(), http, cx);
618
619            telemetry.state.lock().max_queue_size = 4;
620            telemetry.start(system_id, installation_id, session_id, cx);
621
622            assert!(is_empty_state(&telemetry));
623
624            let first_date_time = clock.utc_now();
625            let operation = "test".to_string();
626
627            let event = telemetry.report_app_event(operation.clone());
628            assert_eq!(
629                event,
630                Event::App(AppEvent {
631                    operation: operation.clone(),
632                })
633            );
634            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
635            assert!(telemetry.state.lock().flush_events_task.is_some());
636            assert_eq!(
637                telemetry.state.lock().first_event_date_time,
638                Some(first_date_time)
639            );
640
641            clock.advance(Duration::from_millis(100));
642
643            let event = telemetry.report_app_event(operation.clone());
644            assert_eq!(
645                event,
646                Event::App(AppEvent {
647                    operation: operation.clone(),
648                })
649            );
650            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
651            assert!(telemetry.state.lock().flush_events_task.is_some());
652            assert_eq!(
653                telemetry.state.lock().first_event_date_time,
654                Some(first_date_time)
655            );
656
657            clock.advance(Duration::from_millis(100));
658
659            let event = telemetry.report_app_event(operation.clone());
660            assert_eq!(
661                event,
662                Event::App(AppEvent {
663                    operation: operation.clone(),
664                })
665            );
666            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
667            assert!(telemetry.state.lock().flush_events_task.is_some());
668            assert_eq!(
669                telemetry.state.lock().first_event_date_time,
670                Some(first_date_time)
671            );
672
673            clock.advance(Duration::from_millis(100));
674
675            // Adding a 4th event should cause a flush
676            let event = telemetry.report_app_event(operation.clone());
677            assert_eq!(
678                event,
679                Event::App(AppEvent {
680                    operation: operation.clone(),
681                })
682            );
683
684            assert!(is_empty_state(&telemetry));
685        });
686    }
687
688    #[gpui::test]
689    async fn test_telemetry_flush_on_flush_interval(
690        executor: BackgroundExecutor,
691        cx: &mut TestAppContext,
692    ) {
693        init_test(cx);
694        let clock = Arc::new(FakeSystemClock::new());
695        let http = FakeHttpClient::with_200_response();
696        let system_id = Some("system_id".to_string());
697        let installation_id = Some("installation_id".to_string());
698        let session_id = "session_id".to_string();
699
700        cx.update(|cx| {
701            let telemetry = Telemetry::new(clock.clone(), http, cx);
702            telemetry.state.lock().max_queue_size = 4;
703            telemetry.start(system_id, installation_id, session_id, cx);
704
705            assert!(is_empty_state(&telemetry));
706
707            let first_date_time = clock.utc_now();
708            let operation = "test".to_string();
709
710            let event = telemetry.report_app_event(operation.clone());
711            assert_eq!(
712                event,
713                Event::App(AppEvent {
714                    operation: operation.clone(),
715                })
716            );
717            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
718            assert!(telemetry.state.lock().flush_events_task.is_some());
719            assert_eq!(
720                telemetry.state.lock().first_event_date_time,
721                Some(first_date_time)
722            );
723
724            let duration = Duration::from_millis(1);
725
726            // Test 1 millisecond before the flush interval limit is met
727            executor.advance_clock(FLUSH_INTERVAL - duration);
728
729            assert!(!is_empty_state(&telemetry));
730
731            // Test the exact moment the flush interval limit is met
732            executor.advance_clock(duration);
733
734            assert!(is_empty_state(&telemetry));
735        });
736    }
737
738    // TODO:
739    // Test settings
740    // Update FakeHTTPClient to keep track of the number of requests and assert on it
741
742    fn init_test(cx: &mut TestAppContext) {
743        cx.update(|cx| {
744            let settings_store = SettingsStore::test(cx);
745            cx.set_global(settings_store);
746        });
747    }
748
749    fn is_empty_state(telemetry: &Telemetry) -> bool {
750        telemetry.state.lock().events_queue.is_empty()
751            && telemetry.state.lock().flush_events_task.is_none()
752            && telemetry.state.lock().first_event_date_time.is_none()
753    }
754}