telemetry.rs

  1mod event_coalescer;
  2
  3use crate::TelemetrySettings;
  4use anyhow::Result;
  5use clock::SystemClock;
  6use futures::channel::mpsc;
  7use futures::{Future, FutureExt, StreamExt};
  8use gpui::{App, AppContext as _, BackgroundExecutor, Task};
  9use http_client::{self, AsyncBody, HttpClient, HttpClientWithUrl, Method, Request};
 10use parking_lot::Mutex;
 11use release_channel::ReleaseChannel;
 12use settings::{Settings, SettingsStore};
 13use sha2::{Digest, Sha256};
 14use std::collections::{HashMap, HashSet};
 15use std::fs::File;
 16use std::io::Write;
 17use std::sync::LazyLock;
 18use std::time::Instant;
 19use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 20use telemetry_events::{AssistantEventData, AssistantPhase, Event, EventRequestBody, EventWrapper};
 21use util::{ResultExt, TryFutureExt};
 22use worktree::{UpdatedEntriesSet, WorktreeId};
 23
 24use self::event_coalescer::EventCoalescer;
 25
 26pub struct Telemetry {
 27    clock: Arc<dyn SystemClock>,
 28    http_client: Arc<HttpClientWithUrl>,
 29    executor: BackgroundExecutor,
 30    state: Arc<Mutex<TelemetryState>>,
 31}
 32
 33struct TelemetryState {
 34    settings: TelemetrySettings,
 35    system_id: Option<Arc<str>>,       // Per system
 36    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 37    session_id: Option<String>,        // Per app launch
 38    metrics_id: Option<Arc<str>>,      // Per logged-in user
 39    release_channel: Option<&'static str>,
 40    architecture: &'static str,
 41    events_queue: Vec<EventWrapper>,
 42    flush_events_task: Option<Task<()>>,
 43    log_file: Option<File>,
 44    is_staff: Option<bool>,
 45    first_event_date_time: Option<Instant>,
 46    event_coalescer: EventCoalescer,
 47    max_queue_size: usize,
 48    worktree_id_map: WorktreeIdMap,
 49
 50    os_name: String,
 51    app_version: String,
 52    os_version: Option<String>,
 53}
 54
 55#[derive(Debug)]
 56struct WorktreeIdMap(HashMap<String, ProjectCache>);
 57
 58#[derive(Debug)]
 59struct ProjectCache {
 60    name: String,
 61    worktree_ids_reported: HashSet<WorktreeId>,
 62}
 63
 64impl ProjectCache {
 65    fn new(name: String) -> Self {
 66        Self {
 67            name,
 68            worktree_ids_reported: HashSet::default(),
 69        }
 70    }
 71}
 72
 73#[cfg(debug_assertions)]
 74const MAX_QUEUE_LEN: usize = 5;
 75
 76#[cfg(not(debug_assertions))]
 77const MAX_QUEUE_LEN: usize = 50;
 78
 79#[cfg(debug_assertions)]
 80const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
 81
 82#[cfg(not(debug_assertions))]
 83const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
 84static ZED_CLIENT_CHECKSUM_SEED: LazyLock<Option<Vec<u8>>> = LazyLock::new(|| {
 85    option_env!("ZED_CLIENT_CHECKSUM_SEED")
 86        .map(|s| s.as_bytes().into())
 87        .or_else(|| {
 88            env::var("ZED_CLIENT_CHECKSUM_SEED")
 89                .ok()
 90                .map(|s| s.as_bytes().into())
 91        })
 92});
 93
 94pub fn os_name() -> String {
 95    #[cfg(target_os = "macos")]
 96    {
 97        "macOS".to_string()
 98    }
 99    #[cfg(any(target_os = "linux", target_os = "freebsd"))]
100    {
101        format!("Linux {}", gpui::guess_compositor())
102    }
103
104    #[cfg(target_os = "windows")]
105    {
106        "Windows".to_string()
107    }
108}
109
110/// Note: This might do blocking IO! Only call from background threads
111pub fn os_version() -> String {
112    #[cfg(target_os = "macos")]
113    {
114        use cocoa::base::nil;
115        use cocoa::foundation::NSProcessInfo;
116
117        unsafe {
118            let process_info = cocoa::foundation::NSProcessInfo::processInfo(nil);
119            let version = process_info.operatingSystemVersion();
120            gpui::SemanticVersion::new(
121                version.majorVersion as usize,
122                version.minorVersion as usize,
123                version.patchVersion as usize,
124            )
125            .to_string()
126        }
127    }
128    #[cfg(any(target_os = "linux", target_os = "freebsd"))]
129    {
130        use std::path::Path;
131
132        let content = if let Ok(file) = std::fs::read_to_string(&Path::new("/etc/os-release")) {
133            file
134        } else if let Ok(file) = std::fs::read_to_string(&Path::new("/usr/lib/os-release")) {
135            file
136        } else {
137            log::error!("Failed to load /etc/os-release, /usr/lib/os-release");
138            "".to_string()
139        };
140        let mut name = "unknown".to_string();
141        let mut version = "unknown".to_string();
142
143        for line in content.lines() {
144            if line.starts_with("ID=") {
145                name = line.trim_start_matches("ID=").trim_matches('"').to_string();
146            }
147            if line.starts_with("VERSION_ID=") {
148                version = line
149                    .trim_start_matches("VERSION_ID=")
150                    .trim_matches('"')
151                    .to_string();
152            }
153        }
154
155        format!("{} {}", name, version)
156    }
157
158    #[cfg(target_os = "windows")]
159    {
160        let mut info = unsafe { std::mem::zeroed() };
161        let status = unsafe { windows::Wdk::System::SystemServices::RtlGetVersion(&mut info) };
162        if status.is_ok() {
163            gpui::SemanticVersion::new(
164                info.dwMajorVersion as _,
165                info.dwMinorVersion as _,
166                info.dwBuildNumber as _,
167            )
168            .to_string()
169        } else {
170            "unknown".to_string()
171        }
172    }
173}
174
175impl Telemetry {
176    pub fn new(
177        clock: Arc<dyn SystemClock>,
178        client: Arc<HttpClientWithUrl>,
179        cx: &mut App,
180    ) -> Arc<Self> {
181        let release_channel =
182            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
183
184        TelemetrySettings::register(cx);
185
186        let state = Arc::new(Mutex::new(TelemetryState {
187            settings: *TelemetrySettings::get_global(cx),
188            architecture: env::consts::ARCH,
189            release_channel,
190            system_id: None,
191            installation_id: None,
192            session_id: None,
193            metrics_id: None,
194            events_queue: Vec::new(),
195            flush_events_task: None,
196            log_file: None,
197            is_staff: None,
198            first_event_date_time: None,
199            event_coalescer: EventCoalescer::new(clock.clone()),
200            max_queue_size: MAX_QUEUE_LEN,
201            worktree_id_map: WorktreeIdMap(HashMap::from_iter([
202                (
203                    "pnpm-lock.yaml".to_string(),
204                    ProjectCache::new("pnpm".to_string()),
205                ),
206                (
207                    "yarn.lock".to_string(),
208                    ProjectCache::new("yarn".to_string()),
209                ),
210                (
211                    "package.json".to_string(),
212                    ProjectCache::new("node".to_string()),
213                ),
214            ])),
215
216            os_version: None,
217            os_name: os_name(),
218            app_version: release_channel::AppVersion::global(cx).to_string(),
219        }));
220        Self::log_file_path();
221
222        cx.background_spawn({
223            let state = state.clone();
224            let os_version = os_version();
225            state.lock().os_version = Some(os_version.clone());
226            async move {
227                if let Some(tempfile) = File::create(Self::log_file_path()).log_err() {
228                    state.lock().log_file = Some(tempfile);
229                }
230            }
231        })
232        .detach();
233
234        cx.observe_global::<SettingsStore>({
235            let state = state.clone();
236
237            move |cx| {
238                let mut state = state.lock();
239                state.settings = *TelemetrySettings::get_global(cx);
240            }
241        })
242        .detach();
243
244        let this = Arc::new(Self {
245            clock,
246            http_client: client,
247            executor: cx.background_executor().clone(),
248            state,
249        });
250
251        let (tx, mut rx) = mpsc::unbounded();
252        ::telemetry::init(tx);
253
254        cx.background_spawn({
255            let this = Arc::downgrade(&this);
256            async move {
257                while let Some(event) = rx.next().await {
258                    let Some(state) = this.upgrade() else { break };
259                    state.report_event(Event::Flexible(event))
260                }
261            }
262        })
263        .detach();
264
265        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
266        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
267        std::mem::forget(cx.on_app_quit({
268            let this = this.clone();
269            move |_| this.shutdown_telemetry()
270        }));
271
272        this
273    }
274
275    #[cfg(any(test, feature = "test-support"))]
276    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> + use<> {
277        Task::ready(())
278    }
279
280    // Skip calling this function in tests.
281    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
282    #[cfg(not(any(test, feature = "test-support")))]
283    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> + use<> {
284        telemetry::event!("App Closed");
285        // TODO: close final edit period and make sure it's sent
286        Task::ready(())
287    }
288
289    pub fn log_file_path() -> PathBuf {
290        paths::logs_dir().join("telemetry.log")
291    }
292
293    pub fn has_checksum_seed(&self) -> bool {
294        ZED_CLIENT_CHECKSUM_SEED.is_some()
295    }
296
297    pub fn start(
298        self: &Arc<Self>,
299        system_id: Option<String>,
300        installation_id: Option<String>,
301        session_id: String,
302        cx: &App,
303    ) {
304        let mut state = self.state.lock();
305        state.system_id = system_id.map(|id| id.into());
306        state.installation_id = installation_id.map(|id| id.into());
307        state.session_id = Some(session_id);
308        state.app_version = release_channel::AppVersion::global(cx).to_string();
309        state.os_name = os_name();
310    }
311
312    pub fn metrics_enabled(self: &Arc<Self>) -> bool {
313        let state = self.state.lock();
314        let enabled = state.settings.metrics;
315        drop(state);
316        enabled
317    }
318
319    pub fn set_authenticated_user_info(
320        self: &Arc<Self>,
321        metrics_id: Option<String>,
322        is_staff: bool,
323    ) {
324        let mut state = self.state.lock();
325
326        if !state.settings.metrics {
327            return;
328        }
329
330        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
331        state.metrics_id.clone_from(&metrics_id);
332        state.is_staff = Some(is_staff);
333        drop(state);
334    }
335
336    pub fn report_assistant_event(self: &Arc<Self>, event: AssistantEventData) {
337        let event_type = match event.phase {
338            AssistantPhase::Response => "Assistant Responded",
339            AssistantPhase::Invoked => "Assistant Invoked",
340            AssistantPhase::Accepted => "Assistant Response Accepted",
341            AssistantPhase::Rejected => "Assistant Response Rejected",
342        };
343
344        telemetry::event!(
345            event_type,
346            conversation_id = event.conversation_id,
347            kind = event.kind,
348            phase = event.phase,
349            message_id = event.message_id,
350            model = event.model,
351            model_provider = event.model_provider,
352            response_latency = event.response_latency,
353            error_message = event.error_message,
354            language_name = event.language_name,
355        );
356    }
357
358    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str, is_via_ssh: bool) {
359        let mut state = self.state.lock();
360        let period_data = state.event_coalescer.log_event(environment);
361        drop(state);
362
363        if let Some((start, end, environment)) = period_data {
364            let duration = end
365                .saturating_duration_since(start)
366                .min(Duration::from_secs(60 * 60 * 24))
367                .as_millis() as i64;
368
369            telemetry::event!(
370                "Editor Edited",
371                duration = duration,
372                environment = environment.to_string(),
373                is_via_ssh = is_via_ssh
374            );
375        }
376    }
377
378    pub fn report_discovered_project_events(
379        self: &Arc<Self>,
380        worktree_id: WorktreeId,
381        updated_entries_set: &UpdatedEntriesSet,
382    ) {
383        let project_type_names: Vec<String> = {
384            let mut state = self.state.lock();
385            state
386                .worktree_id_map
387                .0
388                .iter_mut()
389                .filter_map(|(project_file_name, project_type_telemetry)| {
390                    if project_type_telemetry
391                        .worktree_ids_reported
392                        .contains(&worktree_id)
393                    {
394                        return None;
395                    }
396
397                    let project_file_found = updated_entries_set.iter().any(|(path, _, _)| {
398                        path.as_ref()
399                            .file_name()
400                            .and_then(|name| name.to_str())
401                            .map(|name_str| name_str == project_file_name)
402                            .unwrap_or(false)
403                    });
404
405                    if !project_file_found {
406                        return None;
407                    }
408
409                    project_type_telemetry
410                        .worktree_ids_reported
411                        .insert(worktree_id);
412
413                    Some(project_type_telemetry.name.clone())
414                })
415                .collect()
416        };
417
418        for project_type_name in project_type_names {
419            telemetry::event!("Project Opened", project_type = project_type_name);
420        }
421    }
422
423    fn report_event(self: &Arc<Self>, event: Event) {
424        let mut state = self.state.lock();
425        // RUST_LOG=telemetry=trace to debug telemetry events
426        log::trace!(target: "telemetry", "{:?}", event);
427
428        if !state.settings.metrics {
429            return;
430        }
431
432        if state.flush_events_task.is_none() {
433            let this = self.clone();
434            let executor = self.executor.clone();
435            state.flush_events_task = Some(self.executor.spawn(async move {
436                executor.timer(FLUSH_INTERVAL).await;
437                this.flush_events().detach();
438            }));
439        }
440
441        let date_time = self.clock.utc_now();
442
443        let milliseconds_since_first_event = match state.first_event_date_time {
444            Some(first_event_date_time) => date_time
445                .saturating_duration_since(first_event_date_time)
446                .min(Duration::from_secs(60 * 60 * 24))
447                .as_millis() as i64,
448            None => {
449                state.first_event_date_time = Some(date_time);
450                0
451            }
452        };
453
454        let signed_in = state.metrics_id.is_some();
455        state.events_queue.push(EventWrapper {
456            signed_in,
457            milliseconds_since_first_event,
458            event,
459        });
460
461        if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
462            drop(state);
463            self.flush_events().detach();
464        }
465    }
466
467    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
468        self.state.lock().metrics_id.clone()
469    }
470
471    pub fn system_id(self: &Arc<Self>) -> Option<Arc<str>> {
472        self.state.lock().system_id.clone()
473    }
474
475    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
476        self.state.lock().installation_id.clone()
477    }
478
479    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
480        self.state.lock().is_staff
481    }
482
483    fn build_request(
484        self: &Arc<Self>,
485        // We take in the JSON bytes buffer so we can reuse the existing allocation.
486        mut json_bytes: Vec<u8>,
487        event_request: EventRequestBody,
488    ) -> Result<Request<AsyncBody>> {
489        json_bytes.clear();
490        serde_json::to_writer(&mut json_bytes, &event_request)?;
491
492        let checksum = calculate_json_checksum(&json_bytes).unwrap_or("".to_string());
493
494        Ok(Request::builder()
495            .method(Method::POST)
496            .uri(
497                self.http_client
498                    .build_zed_api_url("/telemetry/events", &[])?
499                    .as_ref(),
500            )
501            .header("Content-Type", "application/json")
502            .header("x-zed-checksum", checksum)
503            .body(json_bytes.into())?)
504    }
505
506    pub fn flush_events(self: &Arc<Self>) -> Task<()> {
507        let mut state = self.state.lock();
508        state.first_event_date_time = None;
509        let mut events = mem::take(&mut state.events_queue);
510        state.flush_events_task.take();
511        drop(state);
512        if events.is_empty() {
513            return Task::ready(());
514        }
515
516        let this = self.clone();
517        self.executor.spawn(
518            async move {
519                let mut json_bytes = Vec::new();
520
521                if let Some(file) = &mut this.state.lock().log_file {
522                    for event in &mut events {
523                        json_bytes.clear();
524                        serde_json::to_writer(&mut json_bytes, event)?;
525                        file.write_all(&json_bytes)?;
526                        file.write_all(b"\n")?;
527                    }
528                }
529
530                let request_body = {
531                    let state = this.state.lock();
532
533                    EventRequestBody {
534                        system_id: state.system_id.as_deref().map(Into::into),
535                        installation_id: state.installation_id.as_deref().map(Into::into),
536                        session_id: state.session_id.clone(),
537                        metrics_id: state.metrics_id.as_deref().map(Into::into),
538                        is_staff: state.is_staff,
539                        app_version: state.app_version.clone(),
540                        os_name: state.os_name.clone(),
541                        os_version: state.os_version.clone(),
542                        architecture: state.architecture.to_string(),
543
544                        release_channel: state.release_channel.map(Into::into),
545                        events,
546                    }
547                };
548
549                let request = this.build_request(json_bytes, request_body)?;
550                let response = this.http_client.send(request).await?;
551                if response.status() != 200 {
552                    log::error!("Failed to send events: HTTP {:?}", response.status());
553                }
554                anyhow::Ok(())
555            }
556            .log_err()
557            .map(|_| ()),
558        )
559    }
560}
561
562pub fn calculate_json_checksum(json: &impl AsRef<[u8]>) -> Option<String> {
563    let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
564        return None;
565    };
566
567    let mut summer = Sha256::new();
568    summer.update(checksum_seed);
569    summer.update(json);
570    summer.update(checksum_seed);
571    let mut checksum = String::new();
572    for byte in summer.finalize().as_slice() {
573        use std::fmt::Write;
574        write!(&mut checksum, "{:02x}", byte).unwrap();
575    }
576
577    Some(checksum)
578}
579
580#[cfg(test)]
581mod tests {
582    use super::*;
583    use clock::FakeSystemClock;
584    use gpui::TestAppContext;
585    use http_client::FakeHttpClient;
586    use telemetry_events::FlexibleEvent;
587
588    #[gpui::test]
589    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
590        init_test(cx);
591        let clock = Arc::new(FakeSystemClock::new());
592        let http = FakeHttpClient::with_200_response();
593        let system_id = Some("system_id".to_string());
594        let installation_id = Some("installation_id".to_string());
595        let session_id = "session_id".to_string();
596
597        cx.update(|cx| {
598            let telemetry = Telemetry::new(clock.clone(), http, cx);
599
600            telemetry.state.lock().max_queue_size = 4;
601            telemetry.start(system_id, installation_id, session_id, cx);
602
603            assert!(is_empty_state(&telemetry));
604
605            let first_date_time = clock.utc_now();
606            let event_properties = HashMap::from_iter([(
607                "test_key".to_string(),
608                serde_json::Value::String("test_value".to_string()),
609            )]);
610
611            let event = FlexibleEvent {
612                event_type: "test".to_string(),
613                event_properties,
614            };
615
616            telemetry.report_event(Event::Flexible(event.clone()));
617            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
618            assert!(telemetry.state.lock().flush_events_task.is_some());
619            assert_eq!(
620                telemetry.state.lock().first_event_date_time,
621                Some(first_date_time)
622            );
623
624            clock.advance(Duration::from_millis(100));
625
626            telemetry.report_event(Event::Flexible(event.clone()));
627            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
628            assert!(telemetry.state.lock().flush_events_task.is_some());
629            assert_eq!(
630                telemetry.state.lock().first_event_date_time,
631                Some(first_date_time)
632            );
633
634            clock.advance(Duration::from_millis(100));
635
636            telemetry.report_event(Event::Flexible(event.clone()));
637            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
638            assert!(telemetry.state.lock().flush_events_task.is_some());
639            assert_eq!(
640                telemetry.state.lock().first_event_date_time,
641                Some(first_date_time)
642            );
643
644            clock.advance(Duration::from_millis(100));
645
646            // Adding a 4th event should cause a flush
647            telemetry.report_event(Event::Flexible(event));
648            assert!(is_empty_state(&telemetry));
649        });
650    }
651
652    #[gpui::test]
653    async fn test_telemetry_flush_on_flush_interval(
654        executor: BackgroundExecutor,
655        cx: &mut TestAppContext,
656    ) {
657        init_test(cx);
658        let clock = Arc::new(FakeSystemClock::new());
659        let http = FakeHttpClient::with_200_response();
660        let system_id = Some("system_id".to_string());
661        let installation_id = Some("installation_id".to_string());
662        let session_id = "session_id".to_string();
663
664        cx.update(|cx| {
665            let telemetry = Telemetry::new(clock.clone(), http, cx);
666            telemetry.state.lock().max_queue_size = 4;
667            telemetry.start(system_id, installation_id, session_id, cx);
668
669            assert!(is_empty_state(&telemetry));
670            let first_date_time = clock.utc_now();
671
672            let event_properties = HashMap::from_iter([(
673                "test_key".to_string(),
674                serde_json::Value::String("test_value".to_string()),
675            )]);
676
677            let event = FlexibleEvent {
678                event_type: "test".to_string(),
679                event_properties,
680            };
681
682            telemetry.report_event(Event::Flexible(event));
683            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
684            assert!(telemetry.state.lock().flush_events_task.is_some());
685            assert_eq!(
686                telemetry.state.lock().first_event_date_time,
687                Some(first_date_time)
688            );
689
690            let duration = Duration::from_millis(1);
691
692            // Test 1 millisecond before the flush interval limit is met
693            executor.advance_clock(FLUSH_INTERVAL - duration);
694
695            assert!(!is_empty_state(&telemetry));
696
697            // Test the exact moment the flush interval limit is met
698            executor.advance_clock(duration);
699
700            assert!(is_empty_state(&telemetry));
701        });
702    }
703
704    // TODO:
705    // Test settings
706    // Update FakeHTTPClient to keep track of the number of requests and assert on it
707
708    fn init_test(cx: &mut TestAppContext) {
709        cx.update(|cx| {
710            let settings_store = SettingsStore::test(cx);
711            cx.set_global(settings_store);
712        });
713    }
714
715    fn is_empty_state(telemetry: &Telemetry) -> bool {
716        telemetry.state.lock().events_queue.is_empty()
717            && telemetry.state.lock().flush_events_task.is_none()
718            && telemetry.state.lock().first_event_date_time.is_none()
719    }
720}