repl: Initial stdin support for kernels (#48851)

Kyle Kelley created

Support stdin from Jupyter kernels AKA `input()` and `getpass()` in
IPython.

<img width="460" height="380" alt="image"
src="https://github.com/user-attachments/assets/3b26457f-d651-4514-94b4-8cbb6ff52003"
/>

<img width="391" height="243" alt="image"
src="https://github.com/user-attachments/assets/255bd388-5622-4521-ab93-f9ef7fe861aa"
/>

Closes #22746

Release Notes:

- Added STDIN support (`input` / `raw_input`) to REPL

Change summary

Cargo.lock                                |   8 
Cargo.toml                                |   4 
crates/repl/src/kernels/mod.rs            |   1 
crates/repl/src/kernels/native_kernel.rs  |  39 +++
crates/repl/src/kernels/remote_kernels.rs |   9 
crates/repl/src/outputs.rs                | 246 ++++++++++++++++++++++++
crates/repl/src/session.rs                |  37 +++
7 files changed, 329 insertions(+), 15 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -8838,9 +8838,9 @@ dependencies = [
 
 [[package]]
 name = "jupyter-protocol"
-version = "1.1.1"
+version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "073486929b8271fc18bd001fb8604f4b4d88c0fae134b88ed943c46c8826d9eb"
+checksum = "5fecdcf39420574a8df6fa5758cecafa99a4af93a80ca2a9a96596f9b301e3a5"
 dependencies = [
  "async-trait",
  "bytes 1.11.1",
@@ -14237,9 +14237,9 @@ dependencies = [
 
 [[package]]
 name = "runtimelib"
-version = "1.1.0"
+version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "25a8031614aa3913648d167bc69e2b9fda7731f2226ef588b50323c392bfeb58"
+checksum = "d80685459e1e5fa5603182058351ae91c98ca458dfef4e85f0a37be4f7cf1e6c"
 dependencies = [
  "async-dispatcher",
  "async-std",

Cargo.toml 🔗

@@ -553,7 +553,7 @@ itertools = "0.14.0"
 json_dotpath = "1.1"
 jsonschema = "0.37.0"
 jsonwebtoken = "10.0"
-jupyter-protocol = "1.1.1"
+jupyter-protocol = "1.2.0"
 jupyter-websocket-client = "1.0.0"
 libc = "0.2"
 libsqlite3-sys = { version = "0.30.1", features = ["bundled"] }
@@ -636,7 +636,7 @@ reqwest = { git = "https://github.com/zed-industries/reqwest.git", rev = "c15662
     "stream",
 ], package = "zed-reqwest", version = "0.12.15-zed" }
 rsa = "0.9.6"
-runtimelib = { version = "1.1.0", default-features = false, features = [
+runtimelib = { version = "1.2.0", default-features = false, features = [
     "async-dispatcher-runtime", "aws-lc-rs"
 ] }
 rust-embed = { version = "8.4", features = ["include-exclude"] }

crates/repl/src/kernels/mod.rs 🔗

@@ -230,6 +230,7 @@ pub fn python_env_kernel_specifications(
 
 pub trait RunningKernel: Send + Debug {
     fn request_tx(&self) -> mpsc::Sender<JupyterMessage>;
+    fn stdin_tx(&self) -> mpsc::Sender<JupyterMessage>;
     fn working_directory(&self) -> &PathBuf;
     fn execution_state(&self) -> &ExecutionState;
     fn set_execution_state(&mut self, state: ExecutionState);

crates/repl/src/kernels/native_kernel.rs 🔗

@@ -90,6 +90,7 @@ pub struct NativeRunningKernel {
     _process_status_task: Option<Task<()>>,
     pub working_directory: PathBuf,
     pub request_tx: mpsc::Sender<JupyterMessage>,
+    pub stdin_tx: mpsc::Sender<JupyterMessage>,
     pub execution_state: ExecutionState,
     pub kernel_info: Option<KernelInfoReply>,
 }
@@ -154,22 +155,39 @@ impl NativeRunningKernel {
             let iopub_socket =
                 runtimelib::create_client_iopub_connection(&connection_info, "", &session_id)
                     .await?;
-            let shell_socket =
-                runtimelib::create_client_shell_connection(&connection_info, &session_id).await?;
             let control_socket =
                 runtimelib::create_client_control_connection(&connection_info, &session_id).await?;
 
+            let peer_identity = runtimelib::peer_identity_for_session(&session_id)?;
+            let shell_socket =
+                runtimelib::create_client_shell_connection_with_identity(
+                    &connection_info,
+                    &session_id,
+                    peer_identity.clone(),
+                )
+                .await?;
+            let stdin_socket = runtimelib::create_client_stdin_connection_with_identity(
+                &connection_info,
+                &session_id,
+                peer_identity,
+            )
+            .await?;
+
             let (mut shell_send, shell_recv) = shell_socket.split();
             let (mut control_send, control_recv) = control_socket.split();
+            let (mut stdin_send, stdin_recv) = stdin_socket.split();
 
             let (request_tx, mut request_rx) =
                 futures::channel::mpsc::channel::<JupyterMessage>(100);
+            let (stdin_tx, mut stdin_rx) =
+                futures::channel::mpsc::channel::<JupyterMessage>(100);
 
             let recv_task = cx.spawn({
                 let session = session.clone();
                 let mut iopub = iopub_socket;
                 let mut shell = shell_recv;
                 let mut control = control_recv;
+                let mut stdin = stdin_recv;
 
                 async move |cx| -> anyhow::Result<()> {
                     loop {
@@ -177,6 +195,7 @@ impl NativeRunningKernel {
                             msg = iopub.read().fuse() => ("iopub", msg),
                             msg = shell.read().fuse() => ("shell", msg),
                             msg = control.read().fuse() => ("control", msg),
+                            msg = stdin.read().fuse() => ("stdin", msg),
                         };
                         match result {
                             Ok(message) => {
@@ -252,6 +271,15 @@ impl NativeRunningKernel {
                 }
             });
 
+            let stdin_routing_task = cx.background_spawn({
+                async move {
+                    while let Some(message) = stdin_rx.next().await {
+                        stdin_send.send(message).await?;
+                    }
+                    anyhow::Ok(())
+                }
+            });
+
             let stderr = process.stderr.take();
             let stdout = process.stdout.take();
 
@@ -294,6 +322,7 @@ impl NativeRunningKernel {
                     let mut tasks = FuturesUnordered::new();
                     tasks.push(with_name("recv task", recv_task));
                     tasks.push(with_name("routing task", routing_task));
+                    tasks.push(with_name("stdin routing task", stdin_routing_task));
 
                     while let Some((name, result)) = tasks.next().await {
                         if let Err(err) = result {
@@ -341,6 +370,7 @@ impl NativeRunningKernel {
             anyhow::Ok(Box::new(Self {
                 process,
                 request_tx,
+                stdin_tx,
                 working_directory,
                 _process_status_task: Some(process_status_task),
                 connection_path,
@@ -356,6 +386,10 @@ impl RunningKernel for NativeRunningKernel {
         self.request_tx.clone()
     }
 
+    fn stdin_tx(&self) -> mpsc::Sender<JupyterMessage> {
+        self.stdin_tx.clone()
+    }
+
     fn working_directory(&self) -> &PathBuf {
         &self.working_directory
     }
@@ -384,6 +418,7 @@ impl RunningKernel for NativeRunningKernel {
     fn kill(&mut self) {
         self._process_status_task.take();
         self.request_tx.close_channel();
+        self.stdin_tx.close_channel();
         self.process.kill().ok();
     }
 }

crates/repl/src/kernels/remote_kernels.rs 🔗

@@ -119,6 +119,7 @@ pub struct RemoteRunningKernel {
     http_client: Arc<dyn HttpClient>,
     pub working_directory: std::path::PathBuf,
     pub request_tx: mpsc::Sender<JupyterMessage>,
+    pub stdin_tx: mpsc::Sender<JupyterMessage>,
     pub execution_state: ExecutionState,
     pub kernel_info: Option<KernelInfoReply>,
     pub kernel_id: String,
@@ -211,12 +212,15 @@ impl RemoteRunningKernel {
                 }
             });
 
+            let stdin_tx = request_tx.clone();
+
             anyhow::Ok(Box::new(Self {
                 _routing_task: routing_task,
                 _receiving_task: receiving_task,
                 remote_server,
                 working_directory,
                 request_tx,
+                stdin_tx,
                 // todo(kyle): pull this from the kernel API to start with
                 execution_state: ExecutionState::Idle,
                 kernel_info: None,
@@ -245,6 +249,10 @@ impl RunningKernel for RemoteRunningKernel {
         self.request_tx.clone()
     }
 
+    fn stdin_tx(&self) -> futures::channel::mpsc::Sender<runtimelib::JupyterMessage> {
+        self.stdin_tx.clone()
+    }
+
     fn working_directory(&self) -> &std::path::PathBuf {
         &self.working_directory
     }
@@ -292,5 +300,6 @@ impl RunningKernel for RemoteRunningKernel {
 
     fn kill(&mut self) {
         self.request_tx.close_channel();
+        self.stdin_tx.close_channel();
     }
 }

crates/repl/src/outputs.rs 🔗

@@ -36,7 +36,8 @@
 use editor::{Editor, MultiBuffer};
 use gpui::{AnyElement, ClipboardItem, Entity, EventEmitter, Render, WeakEntity};
 use language::Buffer;
-use runtimelib::{ExecutionState, JupyterMessageContent, MimeBundle, MimeType};
+use menu;
+use runtimelib::{ExecutionState, JupyterMessage, JupyterMessageContent, MimeBundle, MimeType};
 use ui::{CommonAnimationExt, CopyButton, IconButton, Tooltip, prelude::*};
 
 mod image;
@@ -441,6 +442,18 @@ pub enum ExecutionStatus {
 pub struct ExecutionViewFinishedEmpty;
 pub struct ExecutionViewFinishedSmall(pub String);
 
+pub struct InputReplyEvent {
+    pub value: String,
+    pub parent_message: JupyterMessage,
+}
+
+struct PendingInput {
+    prompt: String,
+    password: bool,
+    editor: Entity<Editor>,
+    parent_message: JupyterMessage,
+}
+
 /// An ExecutionView shows the outputs of an execution.
 /// It can hold zero or more outputs, which the user
 /// sees as "the output" for a single execution.
@@ -449,10 +462,12 @@ pub struct ExecutionView {
     workspace: WeakEntity<Workspace>,
     pub outputs: Vec<Output>,
     pub status: ExecutionStatus,
+    pending_input: Option<PendingInput>,
 }
 
 impl EventEmitter<ExecutionViewFinishedEmpty> for ExecutionView {}
 impl EventEmitter<ExecutionViewFinishedSmall> for ExecutionView {}
+impl EventEmitter<InputReplyEvent> for ExecutionView {}
 
 impl ExecutionView {
     pub fn new(
@@ -464,6 +479,56 @@ impl ExecutionView {
             workspace,
             outputs: Default::default(),
             status,
+            pending_input: None,
+        }
+    }
+
+    fn submit_input(&mut self, _window: &mut Window, cx: &mut Context<Self>) {
+        if let Some(pending_input) = self.pending_input.take() {
+            let value = pending_input.editor.read(cx).text(cx);
+
+            let display_text = if pending_input.password {
+                format!("{}{}", pending_input.prompt, "*".repeat(value.len()))
+            } else {
+                format!("{}{}", pending_input.prompt, value)
+            };
+            self.outputs.push(Output::Message(display_text));
+
+            cx.emit(InputReplyEvent {
+                value,
+                parent_message: pending_input.parent_message,
+            });
+            cx.notify();
+        }
+    }
+
+    /// Handle an InputRequest message, storing the full message for replying
+    pub fn handle_input_request(
+        &mut self,
+        message: &JupyterMessage,
+        window: &mut Window,
+        cx: &mut Context<Self>,
+    ) {
+        if let JupyterMessageContent::InputRequest(input_request) = &message.content {
+            let prompt = input_request.prompt.clone();
+            let password = input_request.password;
+
+            let editor = cx.new(|cx| {
+                let mut editor = Editor::single_line(window, cx);
+                editor.set_placeholder_text("Type here and press Enter", window, cx);
+                if password {
+                    editor.set_masked(true, cx);
+                }
+                editor
+            });
+
+            self.pending_input = Some(PendingInput {
+                prompt,
+                password,
+                editor,
+                parent_message: message.clone(),
+            });
+            cx.notify();
         }
     }
 
@@ -525,6 +590,10 @@ impl ExecutionView {
                 // Create a marker to clear the output after we get in a new output
                 Output::ClearOutputWaitMarker
             }
+            JupyterMessageContent::InputRequest(_) => {
+                // InputRequest is handled by handle_input_request which needs the full message
+                return;
+            }
             JupyterMessageContent::Status(status) => {
                 match status.execution_state {
                     ExecutionState::Busy => {
@@ -532,6 +601,7 @@ impl ExecutionView {
                     }
                     ExecutionState::Idle => {
                         self.status = ExecutionStatus::Finished;
+                        self.pending_input = None;
                         if self.outputs.is_empty() {
                             cx.emit(ExecutionViewFinishedEmpty);
                         } else if ReplSettings::get_global(cx).inline_output {
@@ -698,7 +768,35 @@ impl Render for ExecutionView {
                 .into_any_element(),
         };
 
-        if self.outputs.is_empty() {
+        let pending_input_element = self.pending_input.as_ref().map(|pending_input| {
+            let prompt_label = if pending_input.prompt.is_empty() {
+                "Input:".to_string()
+            } else {
+                pending_input.prompt.clone()
+            };
+
+            div()
+                .on_action(cx.listener(|this, _: &menu::Confirm, window, cx| {
+                    this.submit_input(window, cx);
+                }))
+                .w_full()
+                .child(
+                    v_flex()
+                        .gap_1()
+                        .child(Label::new(prompt_label).color(Color::Muted))
+                        .child(
+                            div()
+                                .px_2()
+                                .py_1()
+                                .border_1()
+                                .border_color(cx.theme().colors().border)
+                                .rounded_md()
+                                .child(pending_input.editor.clone()),
+                        ),
+                )
+        });
+
+        if self.outputs.is_empty() && pending_input_element.is_none() {
             return v_flex()
                 .min_h(window.line_height())
                 .justify_center()
@@ -713,6 +811,7 @@ impl Render for ExecutionView {
                     .iter()
                     .map(|output| output.render(self.workspace.clone(), window, cx)),
             )
+            .children(pending_input_element)
             .children(match self.status {
                 ExecutionStatus::Executing => vec![status],
                 ExecutionStatus::Queued => vec![status],
@@ -727,8 +826,8 @@ mod tests {
     use super::*;
     use gpui::TestAppContext;
     use runtimelib::{
-        ClearOutput, ErrorOutput, ExecutionState, JupyterMessageContent, MimeType, Status, Stdio,
-        StreamContent,
+        ClearOutput, ErrorOutput, ExecutionState, InputRequest, JupyterMessage,
+        JupyterMessageContent, MimeType, Status, Stdio, StreamContent,
     };
     use settings::SettingsStore;
     use std::path::Path;
@@ -1027,4 +1126,143 @@ mod tests {
             "should emit ExecutionViewFinishedEmpty when idle with no outputs"
         );
     }
+
+    #[gpui::test]
+    async fn test_handle_input_request_creates_pending_input(cx: &mut TestAppContext) {
+        let (mut cx, workspace) = init_test(cx).await;
+        let execution_view = create_execution_view(&mut cx, workspace);
+
+        cx.update(|window, cx| {
+            execution_view.update(cx, |view, cx| {
+                assert!(view.pending_input.is_none());
+
+                let message = JupyterMessage::new(
+                    InputRequest {
+                        prompt: "Enter name: ".to_string(),
+                        password: false,
+                    },
+                    None,
+                );
+                view.handle_input_request(&message, window, cx);
+            });
+        });
+
+        cx.update(|_, cx| {
+            let view = execution_view.read(cx);
+            assert!(view.pending_input.is_some());
+            let pending = view.pending_input.as_ref().unwrap();
+            assert_eq!(pending.prompt, "Enter name: ");
+            assert!(!pending.password);
+        });
+    }
+
+    #[gpui::test]
+    async fn test_handle_input_request_with_password(cx: &mut TestAppContext) {
+        let (mut cx, workspace) = init_test(cx).await;
+        let execution_view = create_execution_view(&mut cx, workspace);
+
+        cx.update(|window, cx| {
+            execution_view.update(cx, |view, cx| {
+                let message = JupyterMessage::new(
+                    InputRequest {
+                        prompt: "Password: ".to_string(),
+                        password: true,
+                    },
+                    None,
+                );
+                view.handle_input_request(&message, window, cx);
+            });
+        });
+
+        cx.update(|_, cx| {
+            let view = execution_view.read(cx);
+            assert!(view.pending_input.is_some());
+            let pending = view.pending_input.as_ref().unwrap();
+            assert_eq!(pending.prompt, "Password: ");
+            assert!(pending.password);
+        });
+    }
+
+    #[gpui::test]
+    async fn test_submit_input_emits_reply_event(cx: &mut TestAppContext) {
+        let (mut cx, workspace) = init_test(cx).await;
+        let execution_view = create_execution_view(&mut cx, workspace);
+
+        let received_value = Arc::new(std::sync::Mutex::new(None::<String>));
+        let received_clone = received_value.clone();
+
+        cx.update(|_, cx| {
+            cx.subscribe(&execution_view, move |_, event: &InputReplyEvent, _cx| {
+                *received_clone.lock().unwrap() = Some(event.value.clone());
+            })
+            .detach();
+        });
+
+        cx.update(|window, cx| {
+            execution_view.update(cx, |view, cx| {
+                let message = JupyterMessage::new(
+                    InputRequest {
+                        prompt: "Name: ".to_string(),
+                        password: false,
+                    },
+                    None,
+                );
+                view.handle_input_request(&message, window, cx);
+
+                // Type into the editor
+                if let Some(ref pending) = view.pending_input {
+                    pending.editor.update(cx, |editor, cx| {
+                        editor.set_text("test_user", window, cx);
+                    });
+                }
+
+                view.submit_input(window, cx);
+            });
+        });
+
+        let value = received_value.lock().unwrap().clone();
+        assert_eq!(value, Some("test_user".to_string()));
+
+        cx.update(|_, cx| {
+            let view = execution_view.read(cx);
+            assert!(
+                view.pending_input.is_none(),
+                "pending_input should be cleared after submit"
+            );
+        });
+    }
+
+    #[gpui::test]
+    async fn test_status_idle_clears_pending_input(cx: &mut TestAppContext) {
+        let (mut cx, workspace) = init_test(cx).await;
+        let execution_view = create_execution_view(&mut cx, workspace);
+
+        cx.update(|window, cx| {
+            execution_view.update(cx, |view, cx| {
+                let message = JupyterMessage::new(
+                    InputRequest {
+                        prompt: "Input: ".to_string(),
+                        password: false,
+                    },
+                    None,
+                );
+                view.handle_input_request(&message, window, cx);
+                assert!(view.pending_input.is_some());
+
+                // Simulate kernel going idle (e.g., execution interrupted)
+                let idle = JupyterMessageContent::Status(Status {
+                    execution_state: ExecutionState::Idle,
+                });
+                view.push_message(&idle, window, cx);
+            });
+        });
+
+        cx.update(|_, cx| {
+            let view = execution_view.read(cx);
+            assert!(
+                view.pending_input.is_none(),
+                "pending_input should be cleared when kernel goes idle"
+            );
+        });
+    }
 }

crates/repl/src/session.rs 🔗

@@ -6,6 +6,7 @@ use crate::{
     kernels::{Kernel, KernelSession, KernelSpecification, NativeRunningKernel},
     outputs::{
         ExecutionStatus, ExecutionView, ExecutionViewFinishedEmpty, ExecutionViewFinishedSmall,
+        InputReplyEvent,
     },
     repl_settings::ReplSettings,
 };
@@ -32,8 +33,8 @@ use gpui::{
 use language::Point;
 use project::Fs;
 use runtimelib::{
-    ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
-    ShutdownRequest,
+    ExecuteRequest, ExecutionState, InputReply, InterruptRequest, JupyterMessage,
+    JupyterMessageContent, ReplyStatus, ShutdownRequest,
 };
 use settings::Settings as _;
 use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
@@ -129,7 +130,11 @@ impl EditorBlock {
         cx: &mut Context<Session>,
     ) {
         self.execution_view.update(cx, |execution_view, cx| {
-            execution_view.push_message(&message.content, window, cx);
+            if matches!(&message.content, JupyterMessageContent::InputRequest(_)) {
+                execution_view.handle_input_request(message, window, cx);
+            } else {
+                execution_view.push_message(&message.content, window, cx);
+            }
         });
     }
 
@@ -424,6 +429,23 @@ impl Session {
         anyhow::Ok(())
     }
 
+    fn send_stdin_reply(
+        &mut self,
+        value: String,
+        parent_message: &JupyterMessage,
+        _cx: &mut Context<Self>,
+    ) {
+        if let Kernel::RunningKernel(kernel) = &mut self.kernel {
+            let reply = InputReply {
+                value,
+                status: ReplyStatus::Ok,
+                error: None,
+            };
+            let message = reply.as_child_of(parent_message);
+            kernel.stdin_tx().try_send(message).log_err();
+        }
+    }
+
     fn replace_block_with_inlay(&mut self, message_id: &str, text: &str, cx: &mut Context<Self>) {
         let Some(block) = self.blocks.remove(message_id) else {
             return;
@@ -511,6 +533,7 @@ impl Session {
 
         let execute_request = ExecuteRequest {
             code,
+            allow_stdin: true,
             ..ExecuteRequest::default()
         };
 
@@ -636,6 +659,14 @@ impl Session {
         );
         self._subscriptions.push(subscription);
 
+        let subscription = cx.subscribe(
+            &editor_block.execution_view,
+            |session, _execution_view, event: &InputReplyEvent, cx| {
+                session.send_stdin_reply(event.value.clone(), &event.parent_message, cx);
+            },
+        );
+        self._subscriptions.push(subscription);
+
         self.blocks
             .insert(message.header.msg_id.clone(), editor_block);