From 8249ef56187b966c33f6667d0d3a35d88d8f2dc0 Mon Sep 17 00:00:00 2001 From: Kyle Kelley Date: Thu, 12 Feb 2026 23:18:55 -0800 Subject: [PATCH] repl: Initial stdin support for kernels (#48851) Support stdin from Jupyter kernels AKA `input()` and `getpass()` in IPython. image image Closes #22746 Release Notes: - Added STDIN support (`input` / `raw_input`) to REPL --- Cargo.lock | 8 +- Cargo.toml | 4 +- crates/repl/src/kernels/mod.rs | 1 + crates/repl/src/kernels/native_kernel.rs | 39 +++- crates/repl/src/kernels/remote_kernels.rs | 9 + crates/repl/src/outputs.rs | 246 +++++++++++++++++++++- crates/repl/src/session.rs | 37 +++- 7 files changed, 329 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1579f5608183850f3fe4bc2cf2b916940ba3d38f..38d70b9dbfc439700145f22ac8350e110f0534d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8838,9 +8838,9 @@ dependencies = [ [[package]] name = "jupyter-protocol" -version = "1.1.1" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "073486929b8271fc18bd001fb8604f4b4d88c0fae134b88ed943c46c8826d9eb" +checksum = "5fecdcf39420574a8df6fa5758cecafa99a4af93a80ca2a9a96596f9b301e3a5" dependencies = [ "async-trait", "bytes 1.11.1", @@ -14237,9 +14237,9 @@ dependencies = [ [[package]] name = "runtimelib" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25a8031614aa3913648d167bc69e2b9fda7731f2226ef588b50323c392bfeb58" +checksum = "d80685459e1e5fa5603182058351ae91c98ca458dfef4e85f0a37be4f7cf1e6c" dependencies = [ "async-dispatcher", "async-std", diff --git a/Cargo.toml b/Cargo.toml index 586a7ce0331785fddf11ecab11fcdb2ef2952e5c..f8a6ee68c21fe2e3e921b692bffb23ca7fc4f6d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -553,7 +553,7 @@ itertools = "0.14.0" json_dotpath = "1.1" jsonschema = "0.37.0" jsonwebtoken = "10.0" -jupyter-protocol = "1.1.1" +jupyter-protocol = "1.2.0" jupyter-websocket-client = "1.0.0" libc = "0.2" libsqlite3-sys = { version = "0.30.1", features = ["bundled"] } @@ -636,7 +636,7 @@ reqwest = { git = "https://github.com/zed-industries/reqwest.git", rev = "c15662 "stream", ], package = "zed-reqwest", version = "0.12.15-zed" } rsa = "0.9.6" -runtimelib = { version = "1.1.0", default-features = false, features = [ +runtimelib = { version = "1.2.0", default-features = false, features = [ "async-dispatcher-runtime", "aws-lc-rs" ] } rust-embed = { version = "8.4", features = ["include-exclude"] } diff --git a/crates/repl/src/kernels/mod.rs b/crates/repl/src/kernels/mod.rs index ceef195f737465afd064790b675e4051786b5aa6..03d352ecac15cd61d8b9592ecf36b9110913c86e 100644 --- a/crates/repl/src/kernels/mod.rs +++ b/crates/repl/src/kernels/mod.rs @@ -230,6 +230,7 @@ pub fn python_env_kernel_specifications( pub trait RunningKernel: Send + Debug { fn request_tx(&self) -> mpsc::Sender; + fn stdin_tx(&self) -> mpsc::Sender; fn working_directory(&self) -> &PathBuf; fn execution_state(&self) -> &ExecutionState; fn set_execution_state(&mut self, state: ExecutionState); diff --git a/crates/repl/src/kernels/native_kernel.rs b/crates/repl/src/kernels/native_kernel.rs index 30e2740fb92c85f9b52e48b6e41593c639350344..bf7a1effc059d5b9fa67e8cd926d27b0c9137923 100644 --- a/crates/repl/src/kernels/native_kernel.rs +++ b/crates/repl/src/kernels/native_kernel.rs @@ -90,6 +90,7 @@ pub struct NativeRunningKernel { _process_status_task: Option>, pub working_directory: PathBuf, pub request_tx: mpsc::Sender, + pub stdin_tx: mpsc::Sender, pub execution_state: ExecutionState, pub kernel_info: Option, } @@ -154,22 +155,39 @@ impl NativeRunningKernel { let iopub_socket = runtimelib::create_client_iopub_connection(&connection_info, "", &session_id) .await?; - let shell_socket = - runtimelib::create_client_shell_connection(&connection_info, &session_id).await?; let control_socket = runtimelib::create_client_control_connection(&connection_info, &session_id).await?; + let peer_identity = runtimelib::peer_identity_for_session(&session_id)?; + let shell_socket = + runtimelib::create_client_shell_connection_with_identity( + &connection_info, + &session_id, + peer_identity.clone(), + ) + .await?; + let stdin_socket = runtimelib::create_client_stdin_connection_with_identity( + &connection_info, + &session_id, + peer_identity, + ) + .await?; + let (mut shell_send, shell_recv) = shell_socket.split(); let (mut control_send, control_recv) = control_socket.split(); + let (mut stdin_send, stdin_recv) = stdin_socket.split(); let (request_tx, mut request_rx) = futures::channel::mpsc::channel::(100); + let (stdin_tx, mut stdin_rx) = + futures::channel::mpsc::channel::(100); let recv_task = cx.spawn({ let session = session.clone(); let mut iopub = iopub_socket; let mut shell = shell_recv; let mut control = control_recv; + let mut stdin = stdin_recv; async move |cx| -> anyhow::Result<()> { loop { @@ -177,6 +195,7 @@ impl NativeRunningKernel { msg = iopub.read().fuse() => ("iopub", msg), msg = shell.read().fuse() => ("shell", msg), msg = control.read().fuse() => ("control", msg), + msg = stdin.read().fuse() => ("stdin", msg), }; match result { Ok(message) => { @@ -252,6 +271,15 @@ impl NativeRunningKernel { } }); + let stdin_routing_task = cx.background_spawn({ + async move { + while let Some(message) = stdin_rx.next().await { + stdin_send.send(message).await?; + } + anyhow::Ok(()) + } + }); + let stderr = process.stderr.take(); let stdout = process.stdout.take(); @@ -294,6 +322,7 @@ impl NativeRunningKernel { let mut tasks = FuturesUnordered::new(); tasks.push(with_name("recv task", recv_task)); tasks.push(with_name("routing task", routing_task)); + tasks.push(with_name("stdin routing task", stdin_routing_task)); while let Some((name, result)) = tasks.next().await { if let Err(err) = result { @@ -341,6 +370,7 @@ impl NativeRunningKernel { anyhow::Ok(Box::new(Self { process, request_tx, + stdin_tx, working_directory, _process_status_task: Some(process_status_task), connection_path, @@ -356,6 +386,10 @@ impl RunningKernel for NativeRunningKernel { self.request_tx.clone() } + fn stdin_tx(&self) -> mpsc::Sender { + self.stdin_tx.clone() + } + fn working_directory(&self) -> &PathBuf { &self.working_directory } @@ -384,6 +418,7 @@ impl RunningKernel for NativeRunningKernel { fn kill(&mut self) { self._process_status_task.take(); self.request_tx.close_channel(); + self.stdin_tx.close_channel(); self.process.kill().ok(); } } diff --git a/crates/repl/src/kernels/remote_kernels.rs b/crates/repl/src/kernels/remote_kernels.rs index 165ca387d0ea98fd0402753fa26b39f8b21c33ca..8315f95833ccb17c50462d7259655e6b420b886b 100644 --- a/crates/repl/src/kernels/remote_kernels.rs +++ b/crates/repl/src/kernels/remote_kernels.rs @@ -119,6 +119,7 @@ pub struct RemoteRunningKernel { http_client: Arc, pub working_directory: std::path::PathBuf, pub request_tx: mpsc::Sender, + pub stdin_tx: mpsc::Sender, pub execution_state: ExecutionState, pub kernel_info: Option, pub kernel_id: String, @@ -211,12 +212,15 @@ impl RemoteRunningKernel { } }); + let stdin_tx = request_tx.clone(); + anyhow::Ok(Box::new(Self { _routing_task: routing_task, _receiving_task: receiving_task, remote_server, working_directory, request_tx, + stdin_tx, // todo(kyle): pull this from the kernel API to start with execution_state: ExecutionState::Idle, kernel_info: None, @@ -245,6 +249,10 @@ impl RunningKernel for RemoteRunningKernel { self.request_tx.clone() } + fn stdin_tx(&self) -> futures::channel::mpsc::Sender { + self.stdin_tx.clone() + } + fn working_directory(&self) -> &std::path::PathBuf { &self.working_directory } @@ -292,5 +300,6 @@ impl RunningKernel for RemoteRunningKernel { fn kill(&mut self) { self.request_tx.close_channel(); + self.stdin_tx.close_channel(); } } diff --git a/crates/repl/src/outputs.rs b/crates/repl/src/outputs.rs index 6686b2003abc8222f4044a8c711be86e18d8c116..0fdc2798822504c34737978996fc2a18cccb0e39 100644 --- a/crates/repl/src/outputs.rs +++ b/crates/repl/src/outputs.rs @@ -36,7 +36,8 @@ use editor::{Editor, MultiBuffer}; use gpui::{AnyElement, ClipboardItem, Entity, EventEmitter, Render, WeakEntity}; use language::Buffer; -use runtimelib::{ExecutionState, JupyterMessageContent, MimeBundle, MimeType}; +use menu; +use runtimelib::{ExecutionState, JupyterMessage, JupyterMessageContent, MimeBundle, MimeType}; use ui::{CommonAnimationExt, CopyButton, IconButton, Tooltip, prelude::*}; mod image; @@ -441,6 +442,18 @@ pub enum ExecutionStatus { pub struct ExecutionViewFinishedEmpty; pub struct ExecutionViewFinishedSmall(pub String); +pub struct InputReplyEvent { + pub value: String, + pub parent_message: JupyterMessage, +} + +struct PendingInput { + prompt: String, + password: bool, + editor: Entity, + parent_message: JupyterMessage, +} + /// An ExecutionView shows the outputs of an execution. /// It can hold zero or more outputs, which the user /// sees as "the output" for a single execution. @@ -449,10 +462,12 @@ pub struct ExecutionView { workspace: WeakEntity, pub outputs: Vec, pub status: ExecutionStatus, + pending_input: Option, } impl EventEmitter for ExecutionView {} impl EventEmitter for ExecutionView {} +impl EventEmitter for ExecutionView {} impl ExecutionView { pub fn new( @@ -464,6 +479,56 @@ impl ExecutionView { workspace, outputs: Default::default(), status, + pending_input: None, + } + } + + fn submit_input(&mut self, _window: &mut Window, cx: &mut Context) { + if let Some(pending_input) = self.pending_input.take() { + let value = pending_input.editor.read(cx).text(cx); + + let display_text = if pending_input.password { + format!("{}{}", pending_input.prompt, "*".repeat(value.len())) + } else { + format!("{}{}", pending_input.prompt, value) + }; + self.outputs.push(Output::Message(display_text)); + + cx.emit(InputReplyEvent { + value, + parent_message: pending_input.parent_message, + }); + cx.notify(); + } + } + + /// Handle an InputRequest message, storing the full message for replying + pub fn handle_input_request( + &mut self, + message: &JupyterMessage, + window: &mut Window, + cx: &mut Context, + ) { + if let JupyterMessageContent::InputRequest(input_request) = &message.content { + let prompt = input_request.prompt.clone(); + let password = input_request.password; + + let editor = cx.new(|cx| { + let mut editor = Editor::single_line(window, cx); + editor.set_placeholder_text("Type here and press Enter", window, cx); + if password { + editor.set_masked(true, cx); + } + editor + }); + + self.pending_input = Some(PendingInput { + prompt, + password, + editor, + parent_message: message.clone(), + }); + cx.notify(); } } @@ -525,6 +590,10 @@ impl ExecutionView { // Create a marker to clear the output after we get in a new output Output::ClearOutputWaitMarker } + JupyterMessageContent::InputRequest(_) => { + // InputRequest is handled by handle_input_request which needs the full message + return; + } JupyterMessageContent::Status(status) => { match status.execution_state { ExecutionState::Busy => { @@ -532,6 +601,7 @@ impl ExecutionView { } ExecutionState::Idle => { self.status = ExecutionStatus::Finished; + self.pending_input = None; if self.outputs.is_empty() { cx.emit(ExecutionViewFinishedEmpty); } else if ReplSettings::get_global(cx).inline_output { @@ -698,7 +768,35 @@ impl Render for ExecutionView { .into_any_element(), }; - if self.outputs.is_empty() { + let pending_input_element = self.pending_input.as_ref().map(|pending_input| { + let prompt_label = if pending_input.prompt.is_empty() { + "Input:".to_string() + } else { + pending_input.prompt.clone() + }; + + div() + .on_action(cx.listener(|this, _: &menu::Confirm, window, cx| { + this.submit_input(window, cx); + })) + .w_full() + .child( + v_flex() + .gap_1() + .child(Label::new(prompt_label).color(Color::Muted)) + .child( + div() + .px_2() + .py_1() + .border_1() + .border_color(cx.theme().colors().border) + .rounded_md() + .child(pending_input.editor.clone()), + ), + ) + }); + + if self.outputs.is_empty() && pending_input_element.is_none() { return v_flex() .min_h(window.line_height()) .justify_center() @@ -713,6 +811,7 @@ impl Render for ExecutionView { .iter() .map(|output| output.render(self.workspace.clone(), window, cx)), ) + .children(pending_input_element) .children(match self.status { ExecutionStatus::Executing => vec![status], ExecutionStatus::Queued => vec![status], @@ -727,8 +826,8 @@ mod tests { use super::*; use gpui::TestAppContext; use runtimelib::{ - ClearOutput, ErrorOutput, ExecutionState, JupyterMessageContent, MimeType, Status, Stdio, - StreamContent, + ClearOutput, ErrorOutput, ExecutionState, InputRequest, JupyterMessage, + JupyterMessageContent, MimeType, Status, Stdio, StreamContent, }; use settings::SettingsStore; use std::path::Path; @@ -1027,4 +1126,143 @@ mod tests { "should emit ExecutionViewFinishedEmpty when idle with no outputs" ); } + + #[gpui::test] + async fn test_handle_input_request_creates_pending_input(cx: &mut TestAppContext) { + let (mut cx, workspace) = init_test(cx).await; + let execution_view = create_execution_view(&mut cx, workspace); + + cx.update(|window, cx| { + execution_view.update(cx, |view, cx| { + assert!(view.pending_input.is_none()); + + let message = JupyterMessage::new( + InputRequest { + prompt: "Enter name: ".to_string(), + password: false, + }, + None, + ); + view.handle_input_request(&message, window, cx); + }); + }); + + cx.update(|_, cx| { + let view = execution_view.read(cx); + assert!(view.pending_input.is_some()); + let pending = view.pending_input.as_ref().unwrap(); + assert_eq!(pending.prompt, "Enter name: "); + assert!(!pending.password); + }); + } + + #[gpui::test] + async fn test_handle_input_request_with_password(cx: &mut TestAppContext) { + let (mut cx, workspace) = init_test(cx).await; + let execution_view = create_execution_view(&mut cx, workspace); + + cx.update(|window, cx| { + execution_view.update(cx, |view, cx| { + let message = JupyterMessage::new( + InputRequest { + prompt: "Password: ".to_string(), + password: true, + }, + None, + ); + view.handle_input_request(&message, window, cx); + }); + }); + + cx.update(|_, cx| { + let view = execution_view.read(cx); + assert!(view.pending_input.is_some()); + let pending = view.pending_input.as_ref().unwrap(); + assert_eq!(pending.prompt, "Password: "); + assert!(pending.password); + }); + } + + #[gpui::test] + async fn test_submit_input_emits_reply_event(cx: &mut TestAppContext) { + let (mut cx, workspace) = init_test(cx).await; + let execution_view = create_execution_view(&mut cx, workspace); + + let received_value = Arc::new(std::sync::Mutex::new(None::)); + let received_clone = received_value.clone(); + + cx.update(|_, cx| { + cx.subscribe(&execution_view, move |_, event: &InputReplyEvent, _cx| { + *received_clone.lock().unwrap() = Some(event.value.clone()); + }) + .detach(); + }); + + cx.update(|window, cx| { + execution_view.update(cx, |view, cx| { + let message = JupyterMessage::new( + InputRequest { + prompt: "Name: ".to_string(), + password: false, + }, + None, + ); + view.handle_input_request(&message, window, cx); + + // Type into the editor + if let Some(ref pending) = view.pending_input { + pending.editor.update(cx, |editor, cx| { + editor.set_text("test_user", window, cx); + }); + } + + view.submit_input(window, cx); + }); + }); + + let value = received_value.lock().unwrap().clone(); + assert_eq!(value, Some("test_user".to_string())); + + cx.update(|_, cx| { + let view = execution_view.read(cx); + assert!( + view.pending_input.is_none(), + "pending_input should be cleared after submit" + ); + }); + } + + #[gpui::test] + async fn test_status_idle_clears_pending_input(cx: &mut TestAppContext) { + let (mut cx, workspace) = init_test(cx).await; + let execution_view = create_execution_view(&mut cx, workspace); + + cx.update(|window, cx| { + execution_view.update(cx, |view, cx| { + let message = JupyterMessage::new( + InputRequest { + prompt: "Input: ".to_string(), + password: false, + }, + None, + ); + view.handle_input_request(&message, window, cx); + assert!(view.pending_input.is_some()); + + // Simulate kernel going idle (e.g., execution interrupted) + let idle = JupyterMessageContent::Status(Status { + execution_state: ExecutionState::Idle, + }); + view.push_message(&idle, window, cx); + }); + }); + + cx.update(|_, cx| { + let view = execution_view.read(cx); + assert!( + view.pending_input.is_none(), + "pending_input should be cleared when kernel goes idle" + ); + }); + } } diff --git a/crates/repl/src/session.rs b/crates/repl/src/session.rs index fcb06c1409c00a6eebf25d48fde89d63ea1d070e..b939dfedc230a32e554bc5ff379f879143e788d1 100644 --- a/crates/repl/src/session.rs +++ b/crates/repl/src/session.rs @@ -6,6 +6,7 @@ use crate::{ kernels::{Kernel, KernelSession, KernelSpecification, NativeRunningKernel}, outputs::{ ExecutionStatus, ExecutionView, ExecutionViewFinishedEmpty, ExecutionViewFinishedSmall, + InputReplyEvent, }, repl_settings::ReplSettings, }; @@ -32,8 +33,8 @@ use gpui::{ use language::Point; use project::Fs; use runtimelib::{ - ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent, - ShutdownRequest, + ExecuteRequest, ExecutionState, InputReply, InterruptRequest, JupyterMessage, + JupyterMessageContent, ReplyStatus, ShutdownRequest, }; use settings::Settings as _; use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration}; @@ -129,7 +130,11 @@ impl EditorBlock { cx: &mut Context, ) { self.execution_view.update(cx, |execution_view, cx| { - execution_view.push_message(&message.content, window, cx); + if matches!(&message.content, JupyterMessageContent::InputRequest(_)) { + execution_view.handle_input_request(message, window, cx); + } else { + execution_view.push_message(&message.content, window, cx); + } }); } @@ -424,6 +429,23 @@ impl Session { anyhow::Ok(()) } + fn send_stdin_reply( + &mut self, + value: String, + parent_message: &JupyterMessage, + _cx: &mut Context, + ) { + if let Kernel::RunningKernel(kernel) = &mut self.kernel { + let reply = InputReply { + value, + status: ReplyStatus::Ok, + error: None, + }; + let message = reply.as_child_of(parent_message); + kernel.stdin_tx().try_send(message).log_err(); + } + } + fn replace_block_with_inlay(&mut self, message_id: &str, text: &str, cx: &mut Context) { let Some(block) = self.blocks.remove(message_id) else { return; @@ -511,6 +533,7 @@ impl Session { let execute_request = ExecuteRequest { code, + allow_stdin: true, ..ExecuteRequest::default() }; @@ -636,6 +659,14 @@ impl Session { ); self._subscriptions.push(subscription); + let subscription = cx.subscribe( + &editor_block.execution_view, + |session, _execution_view, event: &InputReplyEvent, cx| { + session.send_stdin_reply(event.value.clone(), &event.parent_message, cx); + }, + ); + self._subscriptions.push(subscription); + self.blocks .insert(message.header.msg_id.clone(), editor_block);