telemetry.rs

  1mod event_coalescer;
  2
  3use crate::{TelemetrySettings, ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
  4use chrono::{DateTime, Utc};
  5use futures::Future;
  6use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
  7use lazy_static::lazy_static;
  8use parking_lot::Mutex;
  9use serde::Serialize;
 10use settings::{Settings, SettingsStore};
 11use std::{env, io::Write, mem, path::PathBuf, sync::Arc, time::Duration};
 12use sysinfo::{
 13    CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
 14};
 15use tempfile::NamedTempFile;
 16use util::http::HttpClient;
 17#[cfg(not(debug_assertions))]
 18use util::ResultExt;
 19use util::{channel::ReleaseChannel, TryFutureExt};
 20
 21use self::event_coalescer::EventCoalescer;
 22
 23pub struct Telemetry {
 24    http_client: Arc<dyn HttpClient>,
 25    executor: BackgroundExecutor,
 26    state: Arc<Mutex<TelemetryState>>,
 27}
 28
 29struct TelemetryState {
 30    settings: TelemetrySettings,
 31    metrics_id: Option<Arc<str>>,      // Per logged-in user
 32    installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
 33    session_id: Option<Arc<str>>,      // Per app launch
 34    release_channel: Option<&'static str>,
 35    app_metadata: AppMetadata,
 36    architecture: &'static str,
 37    events_queue: Vec<EventWrapper>,
 38    flush_events_task: Option<Task<()>>,
 39    log_file: Option<NamedTempFile>,
 40    is_staff: Option<bool>,
 41    first_event_datetime: Option<DateTime<Utc>>,
 42    event_coalescer: EventCoalescer,
 43}
 44
 45const EVENTS_URL_PATH: &'static str = "/api/events";
 46
 47lazy_static! {
 48    static ref EVENTS_URL: String = format!("{}{}", *ZED_SERVER_URL, EVENTS_URL_PATH);
 49}
 50
 51#[derive(Serialize, Debug)]
 52struct EventRequestBody {
 53    token: &'static str,
 54    installation_id: Option<Arc<str>>,
 55    session_id: Option<Arc<str>>,
 56    is_staff: Option<bool>,
 57    app_version: Option<String>,
 58    os_name: &'static str,
 59    os_version: Option<String>,
 60    architecture: &'static str,
 61    release_channel: Option<&'static str>,
 62    events: Vec<EventWrapper>,
 63}
 64
 65#[derive(Serialize, Debug)]
 66struct EventWrapper {
 67    signed_in: bool,
 68    #[serde(flatten)]
 69    event: Event,
 70}
 71
 72#[derive(Serialize, Debug)]
 73#[serde(rename_all = "snake_case")]
 74pub enum AssistantKind {
 75    Panel,
 76    Inline,
 77}
 78
 79#[derive(Serialize, Debug)]
 80#[serde(tag = "type")]
 81pub enum Event {
 82    Editor {
 83        operation: &'static str,
 84        file_extension: Option<String>,
 85        vim_mode: bool,
 86        copilot_enabled: bool,
 87        copilot_enabled_for_language: bool,
 88        milliseconds_since_first_event: i64,
 89    },
 90    Copilot {
 91        suggestion_id: Option<String>,
 92        suggestion_accepted: bool,
 93        file_extension: Option<String>,
 94        milliseconds_since_first_event: i64,
 95    },
 96    Call {
 97        operation: &'static str,
 98        room_id: Option<u64>,
 99        channel_id: Option<u64>,
100        milliseconds_since_first_event: i64,
101    },
102    Assistant {
103        conversation_id: Option<String>,
104        kind: AssistantKind,
105        model: &'static str,
106        milliseconds_since_first_event: i64,
107    },
108    Cpu {
109        usage_as_percentage: f32,
110        core_count: u32,
111        milliseconds_since_first_event: i64,
112    },
113    Memory {
114        memory_in_bytes: u64,
115        virtual_memory_in_bytes: u64,
116        milliseconds_since_first_event: i64,
117    },
118    App {
119        operation: String,
120        milliseconds_since_first_event: i64,
121    },
122    Setting {
123        setting: &'static str,
124        value: String,
125        milliseconds_since_first_event: i64,
126    },
127    Edit {
128        duration: i64,
129        environment: &'static str,
130        milliseconds_since_first_event: i64,
131    },
132    Action {
133        source: &'static str,
134        action: String,
135        milliseconds_since_first_event: i64,
136    },
137}
138
139#[cfg(debug_assertions)]
140const MAX_QUEUE_LEN: usize = 5;
141
142#[cfg(not(debug_assertions))]
143const MAX_QUEUE_LEN: usize = 50;
144
145#[cfg(debug_assertions)]
146const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
147
148#[cfg(not(debug_assertions))]
149const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
150
151impl Telemetry {
152    pub fn new(client: Arc<dyn HttpClient>, cx: &mut AppContext) -> Arc<Self> {
153        let release_channel = cx
154            .try_global::<ReleaseChannel>()
155            .map(|release_channel| release_channel.display_name());
156
157        TelemetrySettings::register(cx);
158
159        let state = Arc::new(Mutex::new(TelemetryState {
160            settings: TelemetrySettings::get_global(cx).clone(),
161            app_metadata: cx.app_metadata(),
162            architecture: env::consts::ARCH,
163            release_channel,
164            installation_id: None,
165            metrics_id: None,
166            session_id: None,
167            events_queue: Vec::new(),
168            flush_events_task: None,
169            log_file: None,
170            is_staff: None,
171            first_event_datetime: None,
172            event_coalescer: EventCoalescer::new(),
173        }));
174
175        #[cfg(not(debug_assertions))]
176        cx.background_executor()
177            .spawn({
178                let state = state.clone();
179                async move {
180                    if let Some(tempfile) =
181                        NamedTempFile::new_in(util::paths::CONFIG_DIR.as_path()).log_err()
182                    {
183                        state.lock().log_file = Some(tempfile);
184                    }
185                }
186            })
187            .detach();
188
189        cx.observe_global::<SettingsStore>({
190            let state = state.clone();
191
192            move |cx| {
193                let mut state = state.lock();
194                state.settings = TelemetrySettings::get_global(cx).clone();
195            }
196        })
197        .detach();
198
199        // TODO: Replace all hardware stuff with nested SystemSpecs json
200        let this = Arc::new(Self {
201            http_client: client,
202            executor: cx.background_executor().clone(),
203            state,
204        });
205
206        // We should only ever have one instance of Telemetry, leak the subscription to keep it alive
207        // rather than store in TelemetryState, complicating spawn as subscriptions are not Send
208        std::mem::forget(cx.on_app_quit({
209            let this = this.clone();
210            move |_| this.shutdown_telemetry()
211        }));
212
213        this
214    }
215
216    #[cfg(any(test, feature = "test-support"))]
217    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
218        Task::ready(())
219    }
220
221    // Skip calling this function in tests.
222    // TestAppContext ends up calling this function on shutdown and it panics when trying to find the TelemetrySettings
223    #[cfg(not(any(test, feature = "test-support")))]
224    fn shutdown_telemetry(self: &Arc<Self>) -> impl Future<Output = ()> {
225        self.report_app_event("close".to_string());
226        // TODO: close final edit period and make sure it's sent
227        Task::ready(())
228    }
229
230    pub fn log_file_path(&self) -> Option<PathBuf> {
231        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
232    }
233
234    pub fn start(
235        self: &Arc<Self>,
236        installation_id: Option<String>,
237        session_id: String,
238        cx: &mut AppContext,
239    ) {
240        let mut state = self.state.lock();
241        state.installation_id = installation_id.map(|id| id.into());
242        state.session_id = Some(session_id.into());
243        drop(state);
244
245        let this = self.clone();
246        cx.spawn(|_| async move {
247            // Avoiding calling `System::new_all()`, as there have been crashes related to it
248            let refresh_kind = RefreshKind::new()
249                .with_memory() // For memory usage
250                .with_processes(ProcessRefreshKind::everything()) // For process usage
251                .with_cpu(CpuRefreshKind::everything()); // For core count
252
253            let mut system = System::new_with_specifics(refresh_kind);
254
255            // Avoiding calling `refresh_all()`, just update what we need
256            system.refresh_specifics(refresh_kind);
257
258            // Waiting some amount of time before the first query is important to get a reasonable value
259            // https://docs.rs/sysinfo/0.29.10/sysinfo/trait.ProcessExt.html#tymethod.cpu_usage
260            const DURATION_BETWEEN_SYSTEM_EVENTS: Duration = Duration::from_secs(4 * 60);
261
262            loop {
263                smol::Timer::after(DURATION_BETWEEN_SYSTEM_EVENTS).await;
264
265                system.refresh_specifics(refresh_kind);
266
267                let current_process = Pid::from_u32(std::process::id());
268                let Some(process) = system.processes().get(&current_process) else {
269                    let process = current_process;
270                    log::error!("Failed to find own process {process:?} in system process table");
271                    // TODO: Fire an error telemetry event
272                    return;
273                };
274
275                this.report_memory_event(process.memory(), process.virtual_memory());
276                this.report_cpu_event(process.cpu_usage(), system.cpus().len() as u32);
277            }
278        })
279        .detach();
280    }
281
282    pub fn set_authenticated_user_info(
283        self: &Arc<Self>,
284        metrics_id: Option<String>,
285        is_staff: bool,
286    ) {
287        let mut state = self.state.lock();
288
289        if !state.settings.metrics {
290            return;
291        }
292
293        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
294        state.metrics_id = metrics_id.clone();
295        state.is_staff = Some(is_staff);
296        drop(state);
297    }
298
299    pub fn report_editor_event(
300        self: &Arc<Self>,
301        file_extension: Option<String>,
302        vim_mode: bool,
303        operation: &'static str,
304        copilot_enabled: bool,
305        copilot_enabled_for_language: bool,
306    ) {
307        let event = Event::Editor {
308            file_extension,
309            vim_mode,
310            operation,
311            copilot_enabled,
312            copilot_enabled_for_language,
313            milliseconds_since_first_event: self.milliseconds_since_first_event(),
314        };
315
316        self.report_event(event)
317    }
318
319    pub fn report_copilot_event(
320        self: &Arc<Self>,
321        suggestion_id: Option<String>,
322        suggestion_accepted: bool,
323        file_extension: Option<String>,
324    ) {
325        let event = Event::Copilot {
326            suggestion_id,
327            suggestion_accepted,
328            file_extension,
329            milliseconds_since_first_event: self.milliseconds_since_first_event(),
330        };
331
332        self.report_event(event)
333    }
334
335    pub fn report_assistant_event(
336        self: &Arc<Self>,
337        conversation_id: Option<String>,
338        kind: AssistantKind,
339        model: &'static str,
340    ) {
341        let event = Event::Assistant {
342            conversation_id,
343            kind,
344            model,
345            milliseconds_since_first_event: self.milliseconds_since_first_event(),
346        };
347
348        self.report_event(event)
349    }
350
351    pub fn report_call_event(
352        self: &Arc<Self>,
353        operation: &'static str,
354        room_id: Option<u64>,
355        channel_id: Option<u64>,
356    ) {
357        let event = Event::Call {
358            operation,
359            room_id,
360            channel_id,
361            milliseconds_since_first_event: self.milliseconds_since_first_event(),
362        };
363
364        self.report_event(event)
365    }
366
367    pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
368        let event = Event::Cpu {
369            usage_as_percentage,
370            core_count,
371            milliseconds_since_first_event: self.milliseconds_since_first_event(),
372        };
373
374        self.report_event(event)
375    }
376
377    pub fn report_memory_event(
378        self: &Arc<Self>,
379        memory_in_bytes: u64,
380        virtual_memory_in_bytes: u64,
381    ) {
382        let event = Event::Memory {
383            memory_in_bytes,
384            virtual_memory_in_bytes,
385            milliseconds_since_first_event: self.milliseconds_since_first_event(),
386        };
387
388        self.report_event(event)
389    }
390
391    pub fn report_app_event(self: &Arc<Self>, operation: String) {
392        let event = Event::App {
393            operation,
394            milliseconds_since_first_event: self.milliseconds_since_first_event(),
395        };
396
397        self.report_event(event)
398    }
399
400    pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
401        let event = Event::Setting {
402            setting,
403            value,
404            milliseconds_since_first_event: self.milliseconds_since_first_event(),
405        };
406
407        self.report_event(event)
408    }
409
410    pub fn log_edit_event(self: &Arc<Self>, environment: &'static str) {
411        let mut state = self.state.lock();
412        let period_data = state.event_coalescer.log_event(environment);
413        drop(state);
414
415        if let Some((start, end, environment)) = period_data {
416            let event = Event::Edit {
417                duration: end.timestamp_millis() - start.timestamp_millis(),
418                environment,
419                milliseconds_since_first_event: self.milliseconds_since_first_event(),
420            };
421
422            self.report_event(event);
423        }
424    }
425
426    pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
427        let event = Event::Action {
428            source,
429            action,
430            milliseconds_since_first_event: self.milliseconds_since_first_event(),
431        };
432
433        self.report_event(event)
434    }
435
436    fn milliseconds_since_first_event(&self) -> i64 {
437        let mut state = self.state.lock();
438
439        match state.first_event_datetime {
440            Some(first_event_datetime) => {
441                let now: DateTime<Utc> = Utc::now();
442                now.timestamp_millis() - first_event_datetime.timestamp_millis()
443            }
444            None => {
445                state.first_event_datetime = Some(Utc::now());
446                0
447            }
448        }
449    }
450
451    fn report_event(self: &Arc<Self>, event: Event) {
452        let mut state = self.state.lock();
453
454        if !state.settings.metrics {
455            return;
456        }
457
458        if state.flush_events_task.is_none() {
459            let this = self.clone();
460            let executor = self.executor.clone();
461            state.flush_events_task = Some(self.executor.spawn(async move {
462                executor.timer(FLUSH_INTERVAL).await;
463                this.flush_events();
464            }));
465        }
466
467        let signed_in = state.metrics_id.is_some();
468        state.events_queue.push(EventWrapper { signed_in, event });
469
470        if state.installation_id.is_some() {
471            if state.events_queue.len() >= MAX_QUEUE_LEN {
472                drop(state);
473                self.flush_events();
474            }
475        }
476    }
477
478    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
479        self.state.lock().metrics_id.clone()
480    }
481
482    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
483        self.state.lock().installation_id.clone()
484    }
485
486    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
487        self.state.lock().is_staff
488    }
489
490    pub fn flush_events(self: &Arc<Self>) {
491        let mut state = self.state.lock();
492        state.first_event_datetime = None;
493        let mut events = mem::take(&mut state.events_queue);
494        state.flush_events_task.take();
495        drop(state);
496        if events.is_empty() {
497            return;
498        }
499
500        let this = self.clone();
501        self.executor
502            .spawn(
503                async move {
504                    let mut json_bytes = Vec::new();
505
506                    if let Some(file) = &mut this.state.lock().log_file {
507                        let file = file.as_file_mut();
508                        for event in &mut events {
509                            json_bytes.clear();
510                            serde_json::to_writer(&mut json_bytes, event)?;
511                            file.write_all(&json_bytes)?;
512                            file.write(b"\n")?;
513                        }
514                    }
515
516                    {
517                        let state = this.state.lock();
518                        let request_body = EventRequestBody {
519                            token: ZED_SECRET_CLIENT_TOKEN,
520                            installation_id: state.installation_id.clone(),
521                            session_id: state.session_id.clone(),
522                            is_staff: state.is_staff.clone(),
523                            app_version: state
524                                .app_metadata
525                                .app_version
526                                .map(|version| version.to_string()),
527                            os_name: state.app_metadata.os_name,
528                            os_version: state
529                                .app_metadata
530                                .os_version
531                                .map(|version| version.to_string()),
532                            architecture: state.architecture,
533
534                            release_channel: state.release_channel,
535                            events,
536                        };
537                        json_bytes.clear();
538                        serde_json::to_writer(&mut json_bytes, &request_body)?;
539                    }
540
541                    this.http_client
542                        .post_json(EVENTS_URL.as_str(), json_bytes.into())
543                        .await?;
544                    anyhow::Ok(())
545                }
546                .log_err(),
547            )
548            .detach();
549    }
550}