telemetry.rs

  1mod event_coalescer;
  2
  3use crate::{ChannelId, TelemetrySettings};
  4use anyhow::Result;
  5use clock::SystemClock;
  6use collections::{HashMap, HashSet};
  7use futures::Future;
  8use gpui::{AppContext, BackgroundExecutor, Task};
  9use http_client::{self, AsyncBody, HttpClient, HttpClientWithUrl, Method, Request};
 10use once_cell::sync::Lazy;
 11use parking_lot::Mutex;
 12use release_channel::ReleaseChannel;
 13use settings::{Settings, SettingsStore};
 14use sha2::{Digest, Sha256};
 15use std::fs::File;
 16use std::io::Write;
 17use std::time::Instant;
 18use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 19use telemetry_events::{
 20    ActionEvent, AppEvent, AssistantEvent, CallEvent, EditEvent, EditorEvent, Event,
 21    EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, ReplEvent, SettingEvent,
 22};
 23use util::{ResultExt, TryFutureExt};
 24use worktree::{UpdatedEntriesSet, WorktreeId};
 25
 26use self::event_coalescer::EventCoalescer;
 27
 28pub struct Telemetry {
 29    clock: Arc<dyn SystemClock>,
 30    http_client: Arc<HttpClientWithUrl>,
 31    executor: BackgroundExecutor,
 32    state: Arc<Mutex<TelemetryState>>,
 33}
 34
 35struct TelemetryState {
 36    settings: TelemetrySettings,
 37    system_id: Option<Arc<str>>,       // Per system
 38    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 39    session_id: Option<String>,        // Per app launch
 40    metrics_id: Option<Arc<str>>,      // Per logged-in user
 41    release_channel: Option<&'static str>,
 42    architecture: &'static str,
 43    events_queue: Vec<EventWrapper>,
 44    flush_events_task: Option<Task<()>>,
 45    log_file: Option<File>,
 46    is_staff: Option<bool>,
 47    first_event_date_time: Option<Instant>,
 48    event_coalescer: EventCoalescer,
 49    max_queue_size: usize,
 50    worktree_id_map: WorktreeIdMap,
 51
 52    os_name: String,
 53    app_version: String,
 54    os_version: Option<String>,
 55}
 56
 57#[derive(Debug)]
 58struct WorktreeIdMap(HashMap<String, ProjectCache>);
 59
 60#[derive(Debug)]
 61struct ProjectCache {
 62    name: String,
 63    worktree_ids_reported: HashSet<WorktreeId>,
 64}
 65
 66impl ProjectCache {
 67    fn new(name: String) -> Self {
 68        Self {
 69            name,
 70            worktree_ids_reported: HashSet::default(),
 71        }
 72    }
 73}
 74
 75#[cfg(debug_assertions)]
 76const MAX_QUEUE_LEN: usize = 5;
 77
 78#[cfg(not(debug_assertions))]
 79const MAX_QUEUE_LEN: usize = 50;
 80
 81#[cfg(debug_assertions)]
 82const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
 83
 84#[cfg(not(debug_assertions))]
 85const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
 86static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
 87    option_env!("ZED_CLIENT_CHECKSUM_SEED")
 88        .map(|s| s.as_bytes().into())
 89        .or_else(|| {
 90            env::var("ZED_CLIENT_CHECKSUM_SEED")
 91                .ok()
 92                .map(|s| s.as_bytes().into())
 93        })
 94});
 95
 96pub fn os_name() -> String {
 97    #[cfg(target_os = "macos")]
 98    {
 99        "macOS".to_string()
100    }
101    #[cfg(any(target_os = "linux", target_os = "freebsd"))]
102    {
103        format!("Linux {}", gpui::guess_compositor())
104    }
105
106    #[cfg(target_os = "windows")]
107    {
108        "Windows".to_string()
109    }
110}
111
112/// Note: This might do blocking IO! Only call from background threads
113pub fn os_version() -> String {
114    #[cfg(target_os = "macos")]
115    {
116        use cocoa::base::nil;
117        use cocoa::foundation::NSProcessInfo;
118
119        unsafe {
120            let process_info = cocoa::foundation::NSProcessInfo::processInfo(nil);
121            let version = process_info.operatingSystemVersion();
122            gpui::SemanticVersion::new(
123                version.majorVersion as usize,
124                version.minorVersion as usize,
125                version.patchVersion as usize,
126            )
127            .to_string()
128        }
129    }
130    #[cfg(any(target_os = "linux", target_os = "freebsd"))]
131    {
132        use std::path::Path;
133
134        let content = if let Ok(file) = std::fs::read_to_string(&Path::new("/etc/os-release")) {
135            file
136        } else if let Ok(file) = std::fs::read_to_string(&Path::new("/usr/lib/os-release")) {
137            file
138        } else {
139            log::error!("Failed to load /etc/os-release, /usr/lib/os-release");
140            "".to_string()
141        };
142        let mut name = "unknown".to_string();
143        let mut version = "unknown".to_string();
144
145        for line in content.lines() {
146            if line.starts_with("ID=") {
147                name = line.trim_start_matches("ID=").trim_matches('"').to_string();
148            }
149            if line.starts_with("VERSION_ID=") {
150                version = line
151                    .trim_start_matches("VERSION_ID=")
152                    .trim_matches('"')
153                    .to_string();
154            }
155        }
156
157        format!("{} {}", name, version)
158    }
159
160    #[cfg(target_os = "windows")]
161    {
162        let mut info = unsafe { std::mem::zeroed() };
163        let status = unsafe { windows::Wdk::System::SystemServices::RtlGetVersion(&mut info) };
164        if status.is_ok() {
165            gpui::SemanticVersion::new(
166                info.dwMajorVersion as _,
167                info.dwMinorVersion as _,
168                info.dwBuildNumber as _,
169            )
170            .to_string()
171        } else {
172            "unknown".to_string()
173        }
174    }
175}
176
177impl Telemetry {
178    pub fn new(
179        clock: Arc<dyn SystemClock>,
180        client: Arc<HttpClientWithUrl>,
181        cx: &mut AppContext,
182    ) -> Arc<Self> {
183        let release_channel =
184            ReleaseChannel::try_global(cx).map(|release_channel| release_channel.display_name());
185
186        TelemetrySettings::register(cx);
187
188        let state = Arc::new(Mutex::new(TelemetryState {
189            settings: *TelemetrySettings::get_global(cx),
190            architecture: env::consts::ARCH,
191            release_channel,
192            system_id: None,
193            installation_id: None,
194            session_id: None,
195            metrics_id: None,
196            events_queue: Vec::new(),
197            flush_events_task: None,
198            log_file: None,
199            is_staff: None,
200            first_event_date_time: None,
201            event_coalescer: EventCoalescer::new(clock.clone()),
202            max_queue_size: MAX_QUEUE_LEN,
203            worktree_id_map: WorktreeIdMap(HashMap::from_iter([
204                (
205                    "pnpm-lock.yaml".to_string(),
206                    ProjectCache::new("pnpm".to_string()),
207                ),
208                (
209                    "yarn.lock".to_string(),
210                    ProjectCache::new("yarn".to_string()),
211                ),
212                (
213                    "package.json".to_string(),
214                    ProjectCache::new("node".to_string()),
215                ),
216            ])),
217
218            os_version: None,
219            os_name: os_name(),
220            app_version: release_channel::AppVersion::global(cx).to_string(),
221        }));
222        Self::log_file_path();
223
224        cx.background_executor()
225            .spawn({
226                let state = state.clone();
227                let os_version = os_version();
228                state.lock().os_version = Some(os_version.clone());
229                async move {
230                    if let Some(tempfile) = File::create(Self::log_file_path()).log_err() {
231                        state.lock().log_file = Some(tempfile);
232                    }
233                }
234            })
235            .detach();
236
237        cx.observe_global::<SettingsStore>({
238            let state = state.clone();
239
240            move |cx| {
241                let mut state = state.lock();
242                state.settings = *TelemetrySettings::get_global(cx);
243            }
244        })
245        .detach();
246
247        // TODO: Replace all hardware stuff with nested SystemSpecs json
248        let this = Arc::new(Self {
249            clock,
250            http_client: client,
251            executor: cx.background_executor().clone(),
252            state,
253        });
254
255        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
256        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
257        std::mem::forget(cx.on_app_quit({
258            let this = this.clone();
259            move |_| this.shutdown_telemetry()
260        }));
261
262        this
263    }
264
265    #[cfg(any(test, feature = "test-support"))]
266    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
267        Task::ready(())
268    }
269
270    // Skip calling this function in tests.
271    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
272    #[cfg(not(any(test, feature = "test-support")))]
273    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
274        self.report_app_event("close".to_string());
275        // TODO: close final edit period and make sure it's sent
276        Task::ready(())
277    }
278
279    pub fn log_file_path() -> PathBuf {
280        paths::logs_dir().join("telemetry.log")
281    }
282
283    pub fn start(
284        self: &Arc<Self>,
285        system_id: Option<String>,
286        installation_id: Option<String>,
287        session_id: String,
288        cx: &AppContext,
289    ) {
290        let mut state = self.state.lock();
291        state.system_id = system_id.map(|id| id.into());
292        state.installation_id = installation_id.map(|id| id.into());
293        state.session_id = Some(session_id);
294        state.app_version = release_channel::AppVersion::global(cx).to_string();
295        state.os_name = os_name();
296    }
297
298    pub fn metrics_enabled(self: &Arc<Self>) -> bool {
299        let state = self.state.lock();
300        let enabled = state.settings.metrics;
301        drop(state);
302        enabled
303    }
304
305    pub fn set_authenticated_user_info(
306        self: &Arc<Self>,
307        metrics_id: Option<String>,
308        is_staff: bool,
309    ) {
310        let mut state = self.state.lock();
311
312        if !state.settings.metrics {
313            return;
314        }
315
316        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
317        state.metrics_id.clone_from(&metrics_id);
318        state.is_staff = Some(is_staff);
319        drop(state);
320    }
321
322    pub fn report_editor_event(
323        self: &Arc<Self>,
324        file_extension: Option<String>,
325        vim_mode: bool,
326        operation: &'static str,
327        copilot_enabled: bool,
328        copilot_enabled_for_language: bool,
329        is_via_ssh: bool,
330    ) {
331        let event = Event::Editor(EditorEvent {
332            file_extension,
333            vim_mode,
334            operation: operation.into(),
335            copilot_enabled,
336            copilot_enabled_for_language,
337            is_via_ssh,
338        });
339
340        self.report_event(event)
341    }
342
343    pub fn report_inline_completion_event(
344        self: &Arc<Self>,
345        provider: String,
346        suggestion_accepted: bool,
347        file_extension: Option<String>,
348    ) {
349        let event = Event::InlineCompletion(InlineCompletionEvent {
350            provider,
351            suggestion_accepted,
352            file_extension,
353        });
354
355        self.report_event(event)
356    }
357
358    pub fn report_assistant_event(self: &Arc<Self>, event: AssistantEvent) {
359        self.report_event(Event::Assistant(event));
360    }
361
362    pub fn report_call_event(
363        self: &Arc<Self>,
364        operation: &'static str,
365        room_id: Option<u64>,
366        channel_id: Option<ChannelId>,
367    ) {
368        let event = Event::Call(CallEvent {
369            operation: operation.to_string(),
370            room_id,
371            channel_id: channel_id.map(|cid| cid.0),
372        });
373
374        self.report_event(event)
375    }
376
377    pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
378        let event = Event::App(AppEvent { operation });
379
380        self.report_event(event.clone());
381
382        event
383    }
384
385    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
386        let event = Event::Setting(SettingEvent {
387            setting: setting.to_string(),
388            value,
389        });
390
391        self.report_event(event)
392    }
393
394    pub fn report_extension_event(self: &Arc<Self>, extension_id: Arc<str>, version: Arc<str>) {
395        self.report_event(Event::Extension(ExtensionEvent {
396            extension_id,
397            version,
398        }))
399    }
400
401    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str, is_via_ssh: bool) {
402        let mut state = self.state.lock();
403        let period_data = state.event_coalescer.log_event(environment);
404        drop(state);
405
406        if let Some((start, end, environment)) = period_data {
407            let event = Event::Edit(EditEvent {
408                duration: end
409                    .saturating_duration_since(start)
410                    .min(Duration::from_secs(60 * 60 * 24))
411                    .as_millis() as i64,
412                environment: environment.to_string(),
413                is_via_ssh,
414            });
415
416            self.report_event(event);
417        }
418    }
419
420    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
421        let event = Event::Action(ActionEvent {
422            source: source.to_string(),
423            action,
424        });
425
426        self.report_event(event)
427    }
428
429    pub fn report_discovered_project_events(
430        self: &Arc<Self>,
431        worktree_id: WorktreeId,
432        updated_entries_set: &UpdatedEntriesSet,
433    ) {
434        let project_type_names: Vec<String> = {
435            let mut state = self.state.lock();
436            state
437                .worktree_id_map
438                .0
439                .iter_mut()
440                .filter_map(|(project_file_name, project_type_telemetry)| {
441                    if project_type_telemetry
442                        .worktree_ids_reported
443                        .contains(&worktree_id)
444                    {
445                        return None;
446                    }
447
448                    let project_file_found = updated_entries_set.iter().any(|(path, _, _)| {
449                        path.as_ref()
450                            .file_name()
451                            .and_then(|name| name.to_str())
452                            .map(|name_str| name_str == project_file_name)
453                            .unwrap_or(false)
454                    });
455
456                    if !project_file_found {
457                        return None;
458                    }
459
460                    project_type_telemetry
461                        .worktree_ids_reported
462                        .insert(worktree_id);
463
464                    Some(project_type_telemetry.name.clone())
465                })
466                .collect()
467        };
468
469        // Done on purpose to avoid calling `self.state.lock()` multiple times
470        for project_type_name in project_type_names {
471            self.report_app_event(format!("open {} project", project_type_name));
472        }
473    }
474
475    pub fn report_repl_event(
476        self: &Arc<Self>,
477        kernel_language: String,
478        kernel_status: String,
479        repl_session_id: String,
480    ) {
481        let event = Event::Repl(ReplEvent {
482            kernel_language,
483            kernel_status,
484            repl_session_id,
485        });
486
487        self.report_event(event)
488    }
489
490    fn report_event(self: &Arc<Self>, event: Event) {
491        let mut state = self.state.lock();
492
493        if !state.settings.metrics {
494            return;
495        }
496
497        if state.flush_events_task.is_none() {
498            let this = self.clone();
499            let executor = self.executor.clone();
500            state.flush_events_task = Some(self.executor.spawn(async move {
501                executor.timer(FLUSH_INTERVAL).await;
502                this.flush_events();
503            }));
504        }
505
506        let date_time = self.clock.utc_now();
507
508        let milliseconds_since_first_event = match state.first_event_date_time {
509            Some(first_event_date_time) => date_time
510                .saturating_duration_since(first_event_date_time)
511                .min(Duration::from_secs(60 * 60 * 24))
512                .as_millis() as i64,
513            None => {
514                state.first_event_date_time = Some(date_time);
515                0
516            }
517        };
518
519        let signed_in = state.metrics_id.is_some();
520        state.events_queue.push(EventWrapper {
521            signed_in,
522            milliseconds_since_first_event,
523            event,
524        });
525
526        if state.installation_id.is_some() && state.events_queue.len() >= state.max_queue_size {
527            drop(state);
528            self.flush_events();
529        }
530    }
531
532    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
533        self.state.lock().metrics_id.clone()
534    }
535
536    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
537        self.state.lock().installation_id.clone()
538    }
539
540    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
541        self.state.lock().is_staff
542    }
543
544    fn build_request(
545        self: &Arc<Self>,
546        // We take in the JSON bytes buffer so we can reuse the existing allocation.
547        mut json_bytes: Vec<u8>,
548        event_request: EventRequestBody,
549    ) -> Result<Request<AsyncBody>> {
550        json_bytes.clear();
551        serde_json::to_writer(&mut json_bytes, &event_request)?;
552
553        let checksum = calculate_json_checksum(&json_bytes).unwrap_or("".to_string());
554
555        Ok(Request::builder()
556            .method(Method::POST)
557            .uri(
558                self.http_client
559                    .build_zed_api_url("/telemetry/events", &[])?
560                    .as_ref(),
561            )
562            .header("Content-Type", "application/json")
563            .header("x-zed-checksum", checksum)
564            .body(json_bytes.into())?)
565    }
566
567    pub fn flush_events(self: &Arc<Self>) {
568        let mut state = self.state.lock();
569        state.first_event_date_time = None;
570        let mut events = mem::take(&mut state.events_queue);
571        state.flush_events_task.take();
572        drop(state);
573        if events.is_empty() {
574            return;
575        }
576
577        let this = self.clone();
578        self.executor
579            .spawn(
580                async move {
581                    let mut json_bytes = Vec::new();
582
583                    if let Some(file) = &mut this.state.lock().log_file {
584                        for event in &mut events {
585                            json_bytes.clear();
586                            serde_json::to_writer(&mut json_bytes, event)?;
587                            file.write_all(&json_bytes)?;
588                            file.write_all(b"\n")?;
589                        }
590                    }
591
592                    let request_body = {
593                        let state = this.state.lock();
594
595                        EventRequestBody {
596                            system_id: state.system_id.as_deref().map(Into::into),
597                            installation_id: state.installation_id.as_deref().map(Into::into),
598                            session_id: state.session_id.clone(),
599                            metrics_id: state.metrics_id.as_deref().map(Into::into),
600                            is_staff: state.is_staff,
601                            app_version: state.app_version.clone(),
602                            os_name: state.os_name.clone(),
603                            os_version: state.os_version.clone(),
604                            architecture: state.architecture.to_string(),
605
606                            release_channel: state.release_channel.map(Into::into),
607                            events,
608                        }
609                    };
610
611                    let request = this.build_request(json_bytes, request_body)?;
612                    let response = this.http_client.send(request).await?;
613                    if response.status() != 200 {
614                        log::error!("Failed to send events: HTTP {:?}", response.status());
615                    }
616                    anyhow::Ok(())
617                }
618                .log_err(),
619            )
620            .detach();
621    }
622}
623
624pub fn calculate_json_checksum(json: &impl AsRef<[u8]>) -> Option<String> {
625    let Some(checksum_seed) = &*ZED_CLIENT_CHECKSUM_SEED else {
626        return None;
627    };
628
629    let mut summer = Sha256::new();
630    summer.update(checksum_seed);
631    summer.update(json);
632    summer.update(checksum_seed);
633    let mut checksum = String::new();
634    for byte in summer.finalize().as_slice() {
635        use std::fmt::Write;
636        write!(&mut checksum, "{:02x}", byte).unwrap();
637    }
638
639    Some(checksum)
640}
641
642#[cfg(test)]
643mod tests {
644    use super::*;
645    use clock::FakeSystemClock;
646    use gpui::TestAppContext;
647    use http_client::FakeHttpClient;
648
649    #[gpui::test]
650    fn test_telemetry_flush_on_max_queue_size(cx: &mut TestAppContext) {
651        init_test(cx);
652        let clock = Arc::new(FakeSystemClock::new());
653        let http = FakeHttpClient::with_200_response();
654        let system_id = Some("system_id".to_string());
655        let installation_id = Some("installation_id".to_string());
656        let session_id = "session_id".to_string();
657
658        cx.update(|cx| {
659            let telemetry = Telemetry::new(clock.clone(), http, cx);
660
661            telemetry.state.lock().max_queue_size = 4;
662            telemetry.start(system_id, installation_id, session_id, cx);
663
664            assert!(is_empty_state(&telemetry));
665
666            let first_date_time = clock.utc_now();
667            let operation = "test".to_string();
668
669            let event = telemetry.report_app_event(operation.clone());
670            assert_eq!(
671                event,
672                Event::App(AppEvent {
673                    operation: operation.clone(),
674                })
675            );
676            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
677            assert!(telemetry.state.lock().flush_events_task.is_some());
678            assert_eq!(
679                telemetry.state.lock().first_event_date_time,
680                Some(first_date_time)
681            );
682
683            clock.advance(Duration::from_millis(100));
684
685            let event = telemetry.report_app_event(operation.clone());
686            assert_eq!(
687                event,
688                Event::App(AppEvent {
689                    operation: operation.clone(),
690                })
691            );
692            assert_eq!(telemetry.state.lock().events_queue.len(), 2);
693            assert!(telemetry.state.lock().flush_events_task.is_some());
694            assert_eq!(
695                telemetry.state.lock().first_event_date_time,
696                Some(first_date_time)
697            );
698
699            clock.advance(Duration::from_millis(100));
700
701            let event = telemetry.report_app_event(operation.clone());
702            assert_eq!(
703                event,
704                Event::App(AppEvent {
705                    operation: operation.clone(),
706                })
707            );
708            assert_eq!(telemetry.state.lock().events_queue.len(), 3);
709            assert!(telemetry.state.lock().flush_events_task.is_some());
710            assert_eq!(
711                telemetry.state.lock().first_event_date_time,
712                Some(first_date_time)
713            );
714
715            clock.advance(Duration::from_millis(100));
716
717            // Adding a 4th event should cause a flush
718            let event = telemetry.report_app_event(operation.clone());
719            assert_eq!(
720                event,
721                Event::App(AppEvent {
722                    operation: operation.clone(),
723                })
724            );
725
726            assert!(is_empty_state(&telemetry));
727        });
728    }
729
730    #[gpui::test]
731    async fn test_telemetry_flush_on_flush_interval(
732        executor: BackgroundExecutor,
733        cx: &mut TestAppContext,
734    ) {
735        init_test(cx);
736        let clock = Arc::new(FakeSystemClock::new());
737        let http = FakeHttpClient::with_200_response();
738        let system_id = Some("system_id".to_string());
739        let installation_id = Some("installation_id".to_string());
740        let session_id = "session_id".to_string();
741
742        cx.update(|cx| {
743            let telemetry = Telemetry::new(clock.clone(), http, cx);
744            telemetry.state.lock().max_queue_size = 4;
745            telemetry.start(system_id, installation_id, session_id, cx);
746
747            assert!(is_empty_state(&telemetry));
748
749            let first_date_time = clock.utc_now();
750            let operation = "test".to_string();
751
752            let event = telemetry.report_app_event(operation.clone());
753            assert_eq!(
754                event,
755                Event::App(AppEvent {
756                    operation: operation.clone(),
757                })
758            );
759            assert_eq!(telemetry.state.lock().events_queue.len(), 1);
760            assert!(telemetry.state.lock().flush_events_task.is_some());
761            assert_eq!(
762                telemetry.state.lock().first_event_date_time,
763                Some(first_date_time)
764            );
765
766            let duration = Duration::from_millis(1);
767
768            // Test 1 millisecond before the flush interval limit is met
769            executor.advance_clock(FLUSH_INTERVAL - duration);
770
771            assert!(!is_empty_state(&telemetry));
772
773            // Test the exact moment the flush interval limit is met
774            executor.advance_clock(duration);
775
776            assert!(is_empty_state(&telemetry));
777        });
778    }
779
780    // TODO:
781    // Test settings
782    // Update FakeHTTPClient to keep track of the number of requests and assert on it
783
784    fn init_test(cx: &mut TestAppContext) {
785        cx.update(|cx| {
786            let settings_store = SettingsStore::test(cx);
787            cx.set_global(settings_store);
788        });
789    }
790
791    fn is_empty_state(telemetry: &Telemetry) -> bool {
792        telemetry.state.lock().events_queue.is_empty()
793            && telemetry.state.lock().flush_events_task.is_none()
794            && telemetry.state.lock().first_event_date_time.is_none()
795    }
796}