Add repl events (#15259)

Joseph T. Lyons and Kyle Kelley created

Release Notes:

- N/A

---------

Co-authored-by: Kyle Kelley <rgbkrk@gmail.com>

Change summary

Cargo.lock                                      |  1 
crates/client/src/telemetry.rs                  | 17 +++
crates/collab/src/api/events.rs                 | 97 ++++++++++++++++--
crates/repl/Cargo.toml                          |  1 
crates/repl/src/repl.rs                         |  5 
crates/repl/src/repl_editor.rs                  |  5 
crates/repl/src/repl_store.rs                   | 13 +
crates/repl/src/session.rs                      | 64 +++++++++--
crates/telemetry_events/src/telemetry_events.rs |  8 +
crates/zed/src/main.rs                          |  6 
crates/zed/src/zed.rs                           |  6 
11 files changed, 188 insertions(+), 35 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -8657,6 +8657,7 @@ dependencies = [
  "anyhow",
  "async-dispatcher",
  "base64 0.13.1",
+ "client",
  "collections",
  "command_palette_hooks",
  "editor",

crates/client/src/telemetry.rs 🔗

@@ -18,7 +18,7 @@ use sysinfo::{CpuRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};
 use telemetry_events::{
     ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CpuEvent, EditEvent,
     EditorEvent, Event, EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent,
-    MemoryEvent, SettingEvent,
+    MemoryEvent, ReplEvent, SettingEvent,
 };
 use tempfile::NamedTempFile;
 #[cfg(not(debug_assertions))]
@@ -531,6 +531,21 @@ impl Telemetry {
         }
     }
 
+    pub fn report_repl_event(
+        self: &Arc<Self>,
+        kernel_language: String,
+        kernel_status: String,
+        repl_session_id: String,
+    ) {
+        let event = Event::Repl(ReplEvent {
+            kernel_language,
+            kernel_status,
+            repl_session_id,
+        });
+
+        self.report_event(event)
+    }
+
     fn report_event(self: &Arc<Self>, event: Event) {
         let mut state = self.state.lock();
 

crates/collab/src/api/events.rs 🔗

@@ -16,7 +16,7 @@ use sha2::{Digest, Sha256};
 use std::sync::{Arc, OnceLock};
 use telemetry_events::{
     ActionEvent, AppEvent, AssistantEvent, CallEvent, CpuEvent, EditEvent, EditorEvent, Event,
-    EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent,
+    EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, ReplEvent,
     SettingEvent,
 };
 use uuid::Uuid;
@@ -518,6 +518,13 @@ pub async fn post_events(
                         checksum_matched,
                     ))
             }
+            Event::Repl(event) => to_upload.repl_events.push(ReplEventRow::from_event(
+                event.clone(),
+                &wrapper,
+                &request_body,
+                first_event_at,
+                checksum_matched,
+            )),
         }
     }
 
@@ -542,6 +549,7 @@ struct ToUpload {
     extension_events: Vec<ExtensionEventRow>,
     edit_events: Vec<EditEventRow>,
     action_events: Vec<ActionEventRow>,
+    repl_events: Vec<ReplEventRow>,
 }
 
 impl ToUpload {
@@ -617,6 +625,11 @@ impl ToUpload {
             .await
             .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?;
 
+        const REPL_EVENTS_TABLE: &str = "repl_events";
+        Self::upload_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client)
+            .await
+            .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?;
+
         Ok(())
     }
 
@@ -625,22 +638,24 @@ impl ToUpload {
         rows: &[T],
         clickhouse_client: &clickhouse::Client,
     ) -> anyhow::Result<()> {
-        if !rows.is_empty() {
-            let mut insert = clickhouse_client.insert(table)?;
-
-            for event in rows {
-                insert.write(event).await?;
-            }
+        if rows.is_empty() {
+            return Ok(());
+        }
 
-            insert.end().await?;
+        let mut insert = clickhouse_client.insert(table)?;
 
-            let event_count = rows.len();
-            log::info!(
-                "wrote {event_count} {event_specifier} to '{table}'",
-                event_specifier = if event_count == 1 { "event" } else { "events" }
-            );
+        for event in rows {
+            insert.write(event).await?;
         }
 
+        insert.end().await?;
+
+        let event_count = rows.len();
+        log::info!(
+            "wrote {event_count} {event_specifier} to '{table}'",
+            event_specifier = if event_count == 1 { "event" } else { "events" }
+        );
+
         Ok(())
     }
 }
@@ -1189,6 +1204,62 @@ impl ExtensionEventRow {
     }
 }
 
+#[derive(Serialize, Debug, clickhouse::Row)]
+pub struct ReplEventRow {
+    // AppInfoBase
+    app_version: String,
+    major: Option<i32>,
+    minor: Option<i32>,
+    patch: Option<i32>,
+    checksum_matched: bool,
+    release_channel: String,
+    os_name: String,
+    os_version: String,
+
+    // ClientEventBase
+    installation_id: Option<String>,
+    session_id: Option<String>,
+    is_staff: Option<bool>,
+    time: i64,
+
+    // ReplEventRow
+    kernel_language: String,
+    kernel_status: String,
+    repl_session_id: String,
+}
+
+impl ReplEventRow {
+    fn from_event(
+        event: ReplEvent,
+        wrapper: &EventWrapper,
+        body: &EventRequestBody,
+        first_event_at: chrono::DateTime<chrono::Utc>,
+        checksum_matched: bool,
+    ) -> Self {
+        let semver = body.semver();
+        let time =
+            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
+
+        Self {
+            app_version: body.app_version.clone(),
+            major: semver.map(|v| v.major() as i32),
+            minor: semver.map(|v| v.minor() as i32),
+            patch: semver.map(|v| v.patch() as i32),
+            checksum_matched,
+            release_channel: body.release_channel.clone().unwrap_or_default(),
+            os_name: body.os_name.clone(),
+            os_version: body.os_version.clone().unwrap_or_default(),
+            installation_id: body.installation_id.clone(),
+            session_id: body.session_id.clone(),
+            is_staff: body.is_staff,
+            time: time.timestamp_millis(),
+            kernel_language: event.kernel_language,
+            kernel_status: event.kernel_status,
+            repl_session_id: event.repl_session_id,
+        }
+    }
+}
+
 #[derive(Serialize, Debug, clickhouse::Row)]
 pub struct EditEventRow {
     // AppInfoBase

crates/repl/Cargo.toml 🔗

@@ -17,6 +17,7 @@ alacritty_terminal.workspace = true
 anyhow.workspace = true
 async-dispatcher.workspace = true
 base64.workspace = true
+client.workspace = true
 collections.workspace = true
 command_palette_hooks.workspace = true
 editor.workspace = true

crates/repl/src/repl.rs 🔗

@@ -24,13 +24,14 @@ pub use crate::repl_sessions_ui::{
 };
 use crate::repl_store::ReplStore;
 pub use crate::session::Session;
+use client::telemetry::Telemetry;
 
-pub fn init(fs: Arc<dyn Fs>, cx: &mut AppContext) {
+pub fn init(fs: Arc<dyn Fs>, telemetry: Arc<Telemetry>, cx: &mut AppContext) {
     set_dispatcher(zed_dispatcher(cx));
     JupyterSettings::register(cx);
     ::editor::init_settings(cx);
     repl_sessions_ui::init(cx);
-    ReplStore::init(fs, cx);
+    ReplStore::init(fs, telemetry, cx);
 }
 
 fn zed_dispatcher(cx: &mut AppContext) -> impl Dispatcher {

crates/repl/src/repl_editor.rs 🔗

@@ -41,12 +41,15 @@ pub fn run(editor: WeakView<Editor>, cx: &mut WindowContext) -> Result<()> {
         })?;
 
         let fs = store.read(cx).fs().clone();
+        let telemetry = store.read(cx).telemetry().clone();
+
         let session = if let Some(session) = store.read(cx).get_session(editor.entity_id()).cloned()
         {
             session
         } else {
             let weak_editor = editor.downgrade();
-            let session = cx.new_view(|cx| Session::new(weak_editor, fs, kernel_specification, cx));
+            let session = cx
+                .new_view(|cx| Session::new(weak_editor, fs, telemetry, kernel_specification, cx));
 
             editor.update(cx, |_editor, cx| {
                 cx.notify();

crates/repl/src/repl_store.rs 🔗

@@ -1,6 +1,7 @@
 use std::sync::Arc;
 
 use anyhow::Result;
+use client::telemetry::Telemetry;
 use collections::HashMap;
 use command_palette_hooks::CommandPaletteFilter;
 use gpui::{
@@ -22,14 +23,15 @@ pub struct ReplStore {
     enabled: bool,
     sessions: HashMap<EntityId, View<Session>>,
     kernel_specifications: Vec<KernelSpecification>,
+    telemetry: Arc<Telemetry>,
     _subscriptions: Vec<Subscription>,
 }
 
 impl ReplStore {
     const NAMESPACE: &'static str = "repl";
 
-    pub(crate) fn init(fs: Arc<dyn Fs>, cx: &mut AppContext) {
-        let store = cx.new_model(move |cx| Self::new(fs, cx));
+    pub(crate) fn init(fs: Arc<dyn Fs>, telemetry: Arc<Telemetry>, cx: &mut AppContext) {
+        let store = cx.new_model(move |cx| Self::new(fs, telemetry, cx));
 
         store
             .update(cx, |store, cx| store.refresh_kernelspecs(cx))
@@ -42,13 +44,14 @@ impl ReplStore {
         cx.global::<GlobalReplStore>().0.clone()
     }
 
-    pub fn new(fs: Arc<dyn Fs>, cx: &mut ModelContext<Self>) -> Self {
+    pub fn new(fs: Arc<dyn Fs>, telemetry: Arc<Telemetry>, cx: &mut ModelContext<Self>) -> Self {
         let subscriptions = vec![cx.observe_global::<SettingsStore>(move |this, cx| {
             this.set_enabled(JupyterSettings::enabled(cx), cx);
         })];
 
         let this = Self {
             fs,
+            telemetry,
             enabled: JupyterSettings::enabled(cx),
             sessions: HashMap::default(),
             kernel_specifications: Vec::new(),
@@ -62,6 +65,10 @@ impl ReplStore {
         &self.fs
     }
 
+    pub fn telemetry(&self) -> &Arc<Telemetry> {
+        &self.telemetry
+    }
+
     pub fn is_enabled(&self) -> bool {
         self.enabled
     }

crates/repl/src/session.rs 🔗

@@ -1,8 +1,10 @@
 use crate::components::KernelListItem;
+use crate::KernelStatus;
 use crate::{
     kernels::{Kernel, KernelSpecification, RunningKernel},
     outputs::{ExecutionStatus, ExecutionView, LineHeight as _},
 };
+use client::telemetry::Telemetry;
 use collections::{HashMap, HashSet};
 use editor::{
     display_map::{
@@ -34,6 +36,7 @@ pub struct Session {
     blocks: HashMap<String, EditorBlock>,
     messaging_task: Task<()>,
     pub kernel_specification: KernelSpecification,
+    telemetry: Arc<Telemetry>,
     _buffer_subscription: Subscription,
 }
 
@@ -205,9 +208,18 @@ impl Session {
     pub fn new(
         editor: WeakView<Editor>,
         fs: Arc<dyn Fs>,
+        telemetry: Arc<Telemetry>,
         kernel_specification: KernelSpecification,
         cx: &mut ViewContext<Self>,
     ) -> Self {
+        let kernel_language = kernel_specification.kernelspec.language.clone();
+
+        telemetry.report_repl_event(
+            kernel_language.clone(),
+            KernelStatus::Starting.to_string(),
+            cx.entity_id().to_string(),
+        );
+
         let entity_id = editor.entity_id();
         let working_directory = editor
             .upgrade()
@@ -227,11 +239,11 @@ impl Session {
 
                 match kernel {
                     Ok((mut kernel, mut messages_rx)) => {
-                        this.update(&mut cx, |this, cx| {
+                        this.update(&mut cx, |session, cx| {
                             // At this point we can create a new kind of kernel that has the process and our long running background tasks
 
                             let status = kernel.process.status();
-                            this.kernel = Kernel::RunningKernel(kernel);
+                            session.kernel(Kernel::RunningKernel(kernel), cx);
 
                             cx.spawn(|session, mut cx| async move {
                                 let error_message = match status.await {
@@ -252,8 +264,10 @@ impl Session {
 
                                 session
                                     .update(&mut cx, |session, cx| {
-                                        session.kernel =
-                                            Kernel::ErroredLaunch(error_message.clone());
+                                        session.kernel(
+                                            Kernel::ErroredLaunch(error_message.clone()),
+                                            cx,
+                                        );
 
                                         session.blocks.values().for_each(|block| {
                                             block.execution_view.update(
@@ -282,7 +296,7 @@ impl Session {
                             })
                             .detach();
 
-                            this.messaging_task = cx.spawn(|session, mut cx| async move {
+                            session.messaging_task = cx.spawn(|session, mut cx| async move {
                                 while let Some(message) = messages_rx.next().await {
                                     session
                                         .update(&mut cx, |session, cx| {
@@ -295,8 +309,8 @@ impl Session {
                         .ok();
                     }
                     Err(err) => {
-                        this.update(&mut cx, |this, _cx| {
-                            this.kernel = Kernel::ErroredLaunch(err.to_string());
+                        this.update(&mut cx, |session, cx| {
+                            session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
                         })
                         .ok();
                     }
@@ -319,6 +333,7 @@ impl Session {
             blocks: HashMap::default(),
             kernel_specification,
             _buffer_subscription: subscription,
+            telemetry,
         };
     }
 
@@ -474,8 +489,8 @@ impl Session {
 
                 cx.spawn(|this, mut cx| async move {
                     task.await;
-                    this.update(&mut cx, |this, cx| {
-                        this.send(message, cx).ok();
+                    this.update(&mut cx, |session, cx| {
+                        session.send(message, cx).ok();
                     })
                     .ok();
                 })
@@ -501,6 +516,13 @@ impl Session {
         match &message.content {
             JupyterMessageContent::Status(status) => {
                 self.kernel.set_execution_state(&status.execution_state);
+
+                self.telemetry.report_repl_event(
+                    self.kernel_specification.kernelspec.language.clone(),
+                    KernelStatus::from(&self.kernel).to_string(),
+                    cx.entity_id().to_string(),
+                );
+
                 cx.notify();
             }
             JupyterMessageContent::KernelInfoReply(reply) => {
@@ -528,6 +550,23 @@ impl Session {
         }
     }
 
+    pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
+        if let Kernel::Shutdown = kernel {
+            cx.emit(SessionEvent::Shutdown(self.editor.clone()));
+        }
+
+        let kernel_status = KernelStatus::from(&kernel).to_string();
+        let kernel_language = self.kernel_specification.kernelspec.language.clone();
+
+        self.telemetry.report_repl_event(
+            kernel_language,
+            kernel_status,
+            cx.entity_id().to_string(),
+        );
+
+        self.kernel = kernel;
+    }
+
     pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
         let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
 
@@ -544,10 +583,9 @@ impl Session {
 
                     kernel.process.kill().ok();
 
-                    this.update(&mut cx, |this, cx| {
-                        cx.emit(SessionEvent::Shutdown(this.editor.clone()));
-                        this.clear_outputs(cx);
-                        this.kernel = Kernel::Shutdown;
+                    this.update(&mut cx, |session, cx| {
+                        session.clear_outputs(cx);
+                        session.kernel(Kernel::Shutdown, cx);
                         cx.notify();
                     })
                     .ok();

crates/telemetry_events/src/telemetry_events.rs 🔗

@@ -65,6 +65,7 @@ pub enum Event {
     Extension(ExtensionEvent),
     Edit(EditEvent),
     Action(ActionEvent),
+    Repl(ReplEvent),
 }
 
 #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -148,6 +149,13 @@ pub struct AppEvent {
     pub operation: String,
 }
 
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct ReplEvent {
+    pub kernel_language: String,
+    pub kernel_status: String,
+    pub repl_session_id: String,
+}
+
 #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
 pub struct BacktraceFrame {
     pub ip: usize,

crates/zed/src/main.rs 🔗

@@ -169,7 +169,11 @@ fn init_common(app_state: Arc<AppState>, cx: &mut AppContext) {
     supermaven::init(app_state.client.clone(), cx);
     inline_completion_registry::init(app_state.client.telemetry().clone(), cx);
     assistant::init(app_state.fs.clone(), app_state.client.clone(), cx);
-    repl::init(app_state.fs.clone(), cx);
+    repl::init(
+        app_state.fs.clone(),
+        app_state.client.telemetry().clone(),
+        cx,
+    );
     extension::init(
         app_state.fs.clone(),
         app_state.client.clone(),

crates/zed/src/zed.rs 🔗

@@ -3459,7 +3459,11 @@ mod tests {
             terminal_view::init(cx);
             language_model::init(app_state.client.clone(), cx);
             assistant::init(app_state.fs.clone(), app_state.client.clone(), cx);
-            repl::init(app_state.fs.clone(), cx);
+            repl::init(
+                app_state.fs.clone(),
+                app_state.client.telemetry().clone(),
+                cx,
+            );
             tasks_ui::init(cx);
             initialize_workspace(app_state.clone(), cx);
             app_state