telemetry.rs

  1mod event_coalescer;
  2
  3use crate::TelemetrySettings;
  4use anyhow::Result;
  5use clock::SystemClock;
  6use collections::{HashMap, HashSet};
  7use futures::channel::mpsc;
  8use futures::{Future, StreamExt};
  9use gpui::{App, BackgroundExecutor, Task};
 10use http_client::{self, AsyncBody, HttpClient, HttpClientWithUrl, Method, Request};
 11use parking_lot::Mutex;
 12use release_channel::ReleaseChannel;
 13use settings::{Settings, SettingsStore};
 14use sha2::{Digest, Sha256};
 15use std::fs::File;
 16use std::io::Write;
 17use std::sync::LazyLock;
 18use std::time::Instant;
 19use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 20use telemetry_events::{
 21    AppEvent, AssistantEvent, AssistantPhase, EditEvent, Event, EventRequestBody, EventWrapper,
 22};
 23use util::{ResultExt, TryFutureExt};
 24use worktree::{UpdatedEntriesSet, WorktreeId};
 25
 26use self::event_coalescer::EventCoalescer;
 27
 28pub struct Telemetry {
 29    clock: Arc<dyn SystemClock>,
 30    http_client: Arc<HttpClientWithUrl>,
 31    executor: BackgroundExecutor,
 32    state: Arc<Mutex<TelemetryState>>,
 33}
 34
 35struct TelemetryState {
 36    settings: TelemetrySettings,
 37    system_id: Option<Arc<str>>,       // Per system
 38    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 39    session_id: Option<String>,        // Per app launch
 40    metrics_id: Option<Arc<str>>,      // Per logged-in user
 41    release_channel: Option<&'static str>,
 42    architecture: &'static str,
 43    events_queue: Vec<EventWrapper>,
 44    flush_events_task: Option<Task<()>>,
 45    log_file: Option<File>,
 46    is_staff: Option<bool>,
 47    first_event_date_time: Option<Instant>,
 48    event_coalescer: EventCoalescer,
 49    max_queue_size: usize,
 50    worktree_id_map: WorktreeIdMap,
 51
 52    os_name: String,
 53    app_version: String,
 54    os_version: Option<String>,
 55}
 56
 57#[derive(Debug)]
 58struct WorktreeIdMap(HashMap<String, ProjectCache>);
 59
 60#[derive(Debug)]
 61struct ProjectCache {
 62    name: String,
 63    worktree_ids_reported: HashSet<WorktreeId>,
 64}
 65
 66impl ProjectCache {
 67    fn new(name: String) -> Self {
 68        Self {
 69            name,
 70            worktree_ids_reported: HashSet::default(),
 71        }
 72    }
 73}
 74
 75#[cfg(debug_assertions)]
 76const MAX_QUEUE_LEN: usize = 5;
 77
 78#[cfg(not(debug_assertions))]
 79const MAX_QUEUE_LEN: usize = 50;
 80
 81#[cfg(debug_assertions)]
 82const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
 83
 84#[cfg(not(debug_assertions))]
 85const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
 86static ZED_CLIENT_CHECKSUM_SEED: LazyLock<Option<Vec<u8>>> = LazyLock::new(|| {
 87    option_env!("ZED_CLIENT_CHECKSUM_SEED")
 88        .map(|s| s.as_bytes().into())
 89        .or_else(|| {
 90            env::var("ZED_CLIENT_CHECKSUM_SEED")
 91                .ok()
 92                .map(|s| s.as_bytes().into())
 93        })
 94});
 95
 96pub fn os_name() -> String {
 97    #[cfg(target_os = "macos")]
 98    {
 99        "macOS".to_string()
100    }
101    #[cfg(any(target_os = "linux", target_os = "freebsd"))]
102    {
103        format!("Linux {}", gpui::guess_compositor())
104    }
105
106    #[cfg(target_os = "windows")]
107    {
108        "Windows".to_string()
109    }
110}
111
112/// Note: This might do blocking IO! Only call from background threads
113pub fn os_version() -> String {
114    #[cfg(target_os = "macos")]
115    {
116        use cocoa::base::nil;
117        use cocoa::foundation::NSProcessInfo;
118
119        unsafe {
120            let process_info = cocoa::foundation::NSProcessInfo::processInfo(nil);
121            let version = process_info.operatingSystemVersion();
122            gpui::SemanticVersion::new(
123                version.majorVersion as usize,
124                version.minorVersion as usize,
125                version.patchVersion as usize,
126            )
127            .to_string()
128        }
129    }
130    #[cfg(any(target_os = "linux", target_os = "freebsd"))]
131    {
132        use std::path::Path;
133
134        let content = if let Ok(file) = std::fs::read_to_string(&Path::new("/etc/os-release")) {
135            file
136        } else if let Ok(file) = std::fs::read_to_string(&Path::new("/usr/lib/os-release")) {
137            file
138        } else {
139            log::error!("Failed to load /etc/os-release, /usr/lib/os-release");
140            "".to_string()
141        };
142        let mut name = "unknown".to_string();
143        let mut version = "unknown".to_string();
144
145        for line in content.lines() {
146            if line.starts_with("ID=") {
147                name = line.trim_start_matches("ID=").trim_matches('"').to_string();
148            }
149            if line.starts_with("VERSION_ID=") {
150                version = line
151                    .trim_start_matches("VERSION_ID=")
152                    .trim_matches('"')
153                    .to_string();
154            }
155        }
156
157        format!("{} {}", name, version)
158    }
159
160    #[cfg(target_os = "windows")]
161    {
162        let mut info = unsafe { std::mem::zeroed() };
163        let status = unsafe { windows::Wdk::System::SystemServices::RtlGetVersion(&mut info) };
164        if status.is_ok() {
165            gpui::SemanticVersion::new(
166                info.dwMajorVersion as _,
167                info.dwMinorVersion as _,
168                info.dwBuildNumber as _,
169            )
170            .to_string()
171        } else {
172            "unknown".to_string()
173        }
174    }
175}
176
177impl Telemetry {
178    pub fn new(
179        clock: Arc<dyn SystemClock>,
180        client: Arc<HttpClientWithUrl>,
181        cx: &mut App,
182    ) -> Arc<Self> {
183        let release_channel =
184            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
185
186        TelemetrySettings::register(cx);
187
188        let state = Arc::new(Mutex::new(TelemetryState {
189            settings: *TelemetrySettings::get_global(cx),
190            architecture: env::consts::ARCH,
191            release_channel,
192            system_id: None,
193            installation_id: None,
194            session_id: None,
195            metrics_id: None,
196            events_queue: Vec::new(),
197            flush_events_task: None,
198            log_file: None,
199            is_staff: None,
200            first_event_date_time: None,
201            event_coalescer: EventCoalescer::new(clock.clone()),
202            max_queue_size: MAX_QUEUE_LEN,
203            worktree_id_map: WorktreeIdMap(HashMap::from_iter([
204                (
205                    "pnpm-lock.yaml".to_string(),
206                    ProjectCache::new("pnpm".to_string()),
207                ),
208                (
209                    "yarn.lock".to_string(),
210                    ProjectCache::new("yarn".to_string()),
211                ),
212                (
213                    "package.json".to_string(),
214                    ProjectCache::new("node".to_string()),
215                ),
216            ])),
217
218            os_version: None,
219            os_name: os_name(),
220            app_version: release_channel::AppVersion::global(cx).to_string(),
221        }));
222        Self::log_file_path();
223
224        cx.background_executor()
225            .spawn({
226                let state = state.clone();
227                let os_version = os_version();
228                state.lock().os_version = Some(os_version.clone());
229                async move {
230                    if let Some(tempfile) = File::create(Self::log_file_path()).log_err() {
231                        state.lock().log_file = Some(tempfile);
232                    }
233                }
234            })
235            .detach();
236
237        cx.observe_global::<SettingsStore>({
238            let state = state.clone();
239
240            move |cx| {
241                let mut state = state.lock();
242                state.settings = *TelemetrySettings::get_global(cx);
243            }
244        })
245        .detach();
246
247        let this = Arc::new(Self {
248            clock,
249            http_client: client,
250            executor: cx.background_executor().clone(),
251            state,
252        });
253
254        let (tx, mut rx) = mpsc::unbounded();
255        ::telemetry::init(tx);
256
257        cx.background_executor()
258            .spawn({
259                let this = Arc::downgrade(&this);
260                async move {
261                    while let Some(event) = rx.next().await {
262                        let Some(state) = this.upgrade() else { break };
263                        state.report_event(Event::Flexible(event))
264                    }
265                }
266            })
267            .detach();
268
269        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
270        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
271        std::mem::forget(cx.on_app_quit({
272            let this = this.clone();
273            move |_| this.shutdown_telemetry()
274        }));
275
276        this
277    }
278
279    #[cfg(any(test, feature = "test-support"))]
280    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
281        Task::ready(())
282    }
283
284    // Skip calling this function in tests.
285    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
286    #[cfg(not(any(test, feature = "test-support")))]
287    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
288        self.report_app_event("close".to_string());
289        // TODO: close final edit period and make sure it's sent
290        Task::ready(())
291    }
292
293    pub fn log_file_path() -> PathBuf {
294        paths::logs_dir().join("telemetry.log")
295    }
296
297    pub fn start(
298        self: &Arc<Self>,
299        system_id: Option<String>,
300        installation_id: Option<String>,
301        session_id: String,
302        cx: &App,
303    ) {
304        let mut state = self.state.lock();
305        state.system_id = system_id.map(|id| id.into());
306        state.installation_id = installation_id.map(|id| id.into());
307        state.session_id = Some(session_id);
308        state.app_version = release_channel::AppVersion::global(cx).to_string();
309        state.os_name = os_name();
310    }
311
312    pub fn metrics_enabled(self: &Arc<Self>) -> bool {
313        let state = self.state.lock();
314        let enabled = state.settings.metrics;
315        drop(state);
316        enabled
317    }
318
319    pub fn set_authenticated_user_info(
320        self: &Arc<Self>,
321        metrics_id: Option<String>,
322        is_staff: bool,
323    ) {
324        let mut state = self.state.lock();
325
326        if !state.settings.metrics {
327            return;
328        }
329
330        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
331        state.metrics_id.clone_from(&metrics_id);
332        state.is_staff = Some(is_staff);
333        drop(state);
334    }
335
336    pub fn report_assistant_event(self: &Arc<Self>, event: AssistantEvent) {
337        let event_type = match event.phase {
338            AssistantPhase::Response => "Assistant Responded",
339            AssistantPhase::Invoked => "Assistant Invoked",
340            AssistantPhase::Accepted => "Assistant Response Accepted",
341            AssistantPhase::Rejected => "Assistant Response Rejected",
342        };
343
344        telemetry::event!(
345            event_type,
346            conversation_id = event.conversation_id,
347            kind = event.kind,
348            phase = event.phase,
349            message_id = event.message_id,
350            model = event.model,
351            model_provider = event.model_provider,
352            response_latency = event.response_latency,
353            error_message = event.error_message,
354            language_name = event.language_name,
355        );
356    }
357
358    pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
359        let event = Event::App(AppEvent { operation });
360
361        self.report_event(event.clone());
362
363        event
364    }
365
366    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str, is_via_ssh: bool) {
367        let mut state = self.state.lock();
368        let period_data = state.event_coalescer.log_event(environment);
369        drop(state);
370
371        if let Some((start, end, environment)) = period_data {
372            let event = Event::Edit(EditEvent {
373                duration: end
374                    .saturating_duration_since(start)
375                    .min(Duration::from_secs(60 * 60 * 24))
376                    .as_millis() as i64,
377                environment: environment.to_string(),
378                is_via_ssh,
379            });
380
381            self.report_event(event);
382        }
383    }
384
385    pub fn report_discovered_project_events(
386        self: &Arc<Self>,
387        worktree_id: WorktreeId,
388        updated_entries_set: &UpdatedEntriesSet,
389    ) {
390        let project_type_names: Vec<String> = {
391            let mut state = self.state.lock();
392            state
393                .worktree_id_map
394                .0
395                .iter_mut()
396                .filter_map(|(project_file_name, project_type_telemetry)| {
397                    if project_type_telemetry
398                        .worktree_ids_reported
399                        .contains(&worktree_id)
400                    {
401                        return None;
402                    }
403
404                    let project_file_found = updated_entries_set.iter().any(|(path, _, _)| {
405                        path.as_ref()
406                            .file_name()
407                            .and_then(|name| name.to_str())
408                            .map(|name_str| name_str == project_file_name)
409                            .unwrap_or(false)
410                    });
411
412                    if !project_file_found {
413                        return None;
414                    }
415
416                    project_type_telemetry
417                        .worktree_ids_reported
418                        .insert(worktree_id);
419
420                    Some(project_type_telemetry.name.clone())
421                })
422                .collect()
423        };
424
425        // Done on purpose to avoid calling `self.state.lock()` multiple times
426        for project_type_name in project_type_names {
427            self.report_app_event(format!("open {} project", project_type_name));
428        }
429    }
430
431    fn report_event(self: &Arc<Self>, event: Event) {
432        let mut state = self.state.lock();
433
434        if !state.settings.metrics {
435            return;
436        }
437
438        if state.flush_events_task.is_none() {
439            let this = self.clone();
440            let executor = self.executor.clone();
441            state.flush_events_task = Some(self.executor.spawn(async move {
442                executor.timer(FLUSH_INTERVAL).await;
443                this.flush_events();
444            }));
445        }
446
447        let date_time = self.clock.utc_now();
448
449        let milliseconds_since_first_event = match state.first_event_date_time {
450            Some(first_event_date_time) => date_time
451                .saturating_duration_since(first_event_date_time)
452                .min(Duration::from_secs(60 * 60 * 24))
453                .as_millis() as i64,
454            None => {
455                state.first_event_date_time = Some(date_time);
456                0
457            }
458        };
459
460        let signed_in = state.metrics_id.is_some();
461        state.events_queue.push(EventWrapper {
462            signed_in,
463            milliseconds_since_first_event,
464            event,
465        });
466
467        if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
468            drop(state);
469            self.flush_events();
470        }
471    }
472
473    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
474        self.state.lock().metrics_id.clone()
475    }
476
477    pub fn system_id(self: &Arc<Self>) -> Option<Arc<str>> {
478        self.state.lock().system_id.clone()
479    }
480
481    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
482        self.state.lock().installation_id.clone()
483    }
484
485    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
486        self.state.lock().is_staff
487    }
488
489    fn build_request(
490        self: &Arc<Self>,
491        // We take in the JSON bytes buffer so we can reuse the existing allocation.
492        mut json_bytes: Vec<u8>,
493        event_request: EventRequestBody,
494    ) -> Result<Request<AsyncBody>> {
495        json_bytes.clear();
496        serde_json::to_writer(&mut json_bytes, &event_request)?;
497
498        let checksum = calculate_json_checksum(&json_bytes).unwrap_or("".to_string());
499
500        Ok(Request::builder()
501            .method(Method::POST)
502            .uri(
503                self.http_client
504                    .build_zed_api_url("/telemetry/events", &[])?
505                    .as_ref(),
506            )
507            .header("Content-Type", "application/json")
508            .header("x-zed-checksum", checksum)
509            .body(json_bytes.into())?)
510    }
511
512    pub fn flush_events(self: &Arc<Self>) {
513        let mut state = self.state.lock();
514        state.first_event_date_time = None;
515        let mut events = mem::take(&mut state.events_queue);
516        state.flush_events_task.take();
517        drop(state);
518        if events.is_empty() {
519            return;
520        }
521
522        let this = self.clone();
523        self.executor
524            .spawn(
525                async move {
526                    let mut json_bytes = Vec::new();
527
528                    if let Some(file) = &mut this.state.lock().log_file {
529                        for event in &mut events {
530                            json_bytes.clear();
531                            serde_json::to_writer(&mut json_bytes, event)?;
532                            file.write_all(&json_bytes)?;
533                            file.write_all(b"\n")?;
534                        }
535                    }
536
537                    let request_body = {
538                        let state = this.state.lock();
539
540                        EventRequestBody {
541                            system_id: state.system_id.as_deref().map(Into::into),
542                            installation_id: state.installation_id.as_deref().map(Into::into),
543                            session_id: state.session_id.clone(),
544                            metrics_id: state.metrics_id.as_deref().map(Into::into),
545                            is_staff: state.is_staff,
546                            app_version: state.app_version.clone(),
547                            os_name: state.os_name.clone(),
548                            os_version: state.os_version.clone(),
549                            architecture: state.architecture.to_string(),
550
551                            release_channel: state.release_channel.map(Into::into),
552                            events,
553                        }
554                    };
555
556                    let request = this.build_request(json_bytes, request_body)?;
557                    let response = this.http_client.send(request).await?;
558                    if response.status() != 200 {
559                        log::error!("Failed to send events: HTTP {:?}", response.status());
560                    }
561                    anyhow::Ok(())
562                }
563                .log_err(),
564            )
565            .detach();
566    }
567}
568
569pub fn calculate_json_checksum(json: &impl AsRef<[u8]>) -> Option<String> {
570    let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
571        return None;
572    };
573
574    let mut summer = Sha256::new();
575    summer.update(checksum_seed);
576    summer.update(json);
577    summer.update(checksum_seed);
578    let mut checksum = String::new();
579    for byte in summer.finalize().as_slice() {
580        use std::fmt::Write;
581        write!(&mut checksum, "{:02x}", byte).unwrap();
582    }
583
584    Some(checksum)
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590    use clock::FakeSystemClock;
591    use gpui::TestAppContext;
592    use http_client::FakeHttpClient;
593
594    #[gpui::test]
595    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
596        init_test(cx);
597        let clock = Arc::new(FakeSystemClock::new());
598        let http = FakeHttpClient::with_200_response();
599        let system_id = Some("system_id".to_string());
600        let installation_id = Some("installation_id".to_string());
601        let session_id = "session_id".to_string();
602
603        cx.update(|cx| {
604            let telemetry = Telemetry::new(clock.clone(), http, cx);
605
606            telemetry.state.lock().max_queue_size = 4;
607            telemetry.start(system_id, installation_id, session_id, cx);
608
609            assert!(is_empty_state(&telemetry));
610
611            let first_date_time = clock.utc_now();
612            let operation = "test".to_string();
613
614            let event = telemetry.report_app_event(operation.clone());
615            assert_eq!(
616                event,
617                Event::App(AppEvent {
618                    operation: operation.clone(),
619                })
620            );
621            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
622            assert!(telemetry.state.lock().flush_events_task.is_some());
623            assert_eq!(
624                telemetry.state.lock().first_event_date_time,
625                Some(first_date_time)
626            );
627
628            clock.advance(Duration::from_millis(100));
629
630            let event = telemetry.report_app_event(operation.clone());
631            assert_eq!(
632                event,
633                Event::App(AppEvent {
634                    operation: operation.clone(),
635                })
636            );
637            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
638            assert!(telemetry.state.lock().flush_events_task.is_some());
639            assert_eq!(
640                telemetry.state.lock().first_event_date_time,
641                Some(first_date_time)
642            );
643
644            clock.advance(Duration::from_millis(100));
645
646            let event = telemetry.report_app_event(operation.clone());
647            assert_eq!(
648                event,
649                Event::App(AppEvent {
650                    operation: operation.clone(),
651                })
652            );
653            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
654            assert!(telemetry.state.lock().flush_events_task.is_some());
655            assert_eq!(
656                telemetry.state.lock().first_event_date_time,
657                Some(first_date_time)
658            );
659
660            clock.advance(Duration::from_millis(100));
661
662            // Adding a 4th event should cause a flush
663            let event = telemetry.report_app_event(operation.clone());
664            assert_eq!(
665                event,
666                Event::App(AppEvent {
667                    operation: operation.clone(),
668                })
669            );
670
671            assert!(is_empty_state(&telemetry));
672        });
673    }
674
675    #[gpui::test]
676    async fn test_telemetry_flush_on_flush_interval(
677        executor: BackgroundExecutor,
678        cx: &mut TestAppContext,
679    ) {
680        init_test(cx);
681        let clock = Arc::new(FakeSystemClock::new());
682        let http = FakeHttpClient::with_200_response();
683        let system_id = Some("system_id".to_string());
684        let installation_id = Some("installation_id".to_string());
685        let session_id = "session_id".to_string();
686
687        cx.update(|cx| {
688            let telemetry = Telemetry::new(clock.clone(), http, cx);
689            telemetry.state.lock().max_queue_size = 4;
690            telemetry.start(system_id, installation_id, session_id, cx);
691
692            assert!(is_empty_state(&telemetry));
693
694            let first_date_time = clock.utc_now();
695            let operation = "test".to_string();
696
697            let event = telemetry.report_app_event(operation.clone());
698            assert_eq!(
699                event,
700                Event::App(AppEvent {
701                    operation: operation.clone(),
702                })
703            );
704            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
705            assert!(telemetry.state.lock().flush_events_task.is_some());
706            assert_eq!(
707                telemetry.state.lock().first_event_date_time,
708                Some(first_date_time)
709            );
710
711            let duration = Duration::from_millis(1);
712
713            // Test 1 millisecond before the flush interval limit is met
714            executor.advance_clock(FLUSH_INTERVAL - duration);
715
716            assert!(!is_empty_state(&telemetry));
717
718            // Test the exact moment the flush interval limit is met
719            executor.advance_clock(duration);
720
721            assert!(is_empty_state(&telemetry));
722        });
723    }
724
725    // TODO:
726    // Test settings
727    // Update FakeHTTPClient to keep track of the number of requests and assert on it
728
729    fn init_test(cx: &mut TestAppContext) {
730        cx.update(|cx| {
731            let settings_store = SettingsStore::test(cx);
732            cx.set_global(settings_store);
733        });
734    }
735
736    fn is_empty_state(telemetry: &Telemetry) -> bool {
737        telemetry.state.lock().events_queue.is_empty()
738            && telemetry.state.lock().flush_events_task.is_none()
739            && telemetry.state.lock().first_event_date_time.is_none()
740    }
741}