From 3248a05406d0c808851383eb9cb20b2a71717173 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Tue, 28 Oct 2025 15:45:27 +0100 Subject: [PATCH] Propagate Jupyter client errors (#40886) Closes #40884 - Make IOPub task return a `Result` - Create a monitoring task that watches over IOPub, Control, Routing and Shell tasks. - If any of these tasks fail, report the error with `kernel_errored()` (which is already used to report process crashes) https://github.com/user-attachments/assets/3125f6c7-099a-41ca-b668-fe694ecc68b9 This is not perfect. I did not have time to look into this but: - When such errors happen, the kernel should be shut down. - The kernel should no longer appear as online in the UI But at least the user is getting feedback on what went wrong. Release Notes: - Jupyter client errors are now surfaced in the UI (#40884) --- crates/repl/src/kernels/native_kernel.rs | 55 ++++++++++++++++++------ 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/crates/repl/src/kernels/native_kernel.rs b/crates/repl/src/kernels/native_kernel.rs index cf88fbc582296e1d61ea729a642a7e8ec8e290df..8630768decc6e788efdd3eaaadafc0c957e86d7e 100644 --- a/crates/repl/src/kernels/native_kernel.rs +++ b/crates/repl/src/kernels/native_kernel.rs @@ -3,7 +3,7 @@ use futures::{ AsyncBufReadExt as _, SinkExt as _, channel::mpsc::{self}, io::BufReader, - stream::{SelectAll, StreamExt}, + stream::{FuturesUnordered, SelectAll, StreamExt}, }; use gpui::{App, AppContext as _, Entity, EntityId, Task, Window}; use jupyter_protocol::{ @@ -88,9 +88,6 @@ async fn peek_ports(ip: IpAddr) -> Result<[u16; 5]> { pub struct NativeRunningKernel { pub process: smol::process::Child, - _shell_task: Task>, - _control_task: Task>, - _routing_task: Task>, connection_path: PathBuf, _process_status_task: Option>, pub working_directory: PathBuf, @@ -185,27 +182,25 @@ impl NativeRunningKernel { }) .ok(); } - anyhow::Ok(()) } }) .detach(); // iopub task - cx.spawn({ + let iopub_task = cx.spawn({ let session = session.clone(); - async move |cx| { - while let Ok(message) = iopub_socket.read().await { + async move |cx| -> anyhow::Result<()> { + loop { + let message = iopub_socket.read().await?; session .update_in(cx, |session, window, cx| { session.route(&message, window, cx); }) .ok(); } - anyhow::Ok(()) } - }) - .detach(); + }); let (mut control_request_tx, mut control_request_rx) = futures::channel::mpsc::channel(100); @@ -279,6 +274,41 @@ impl NativeRunningKernel { }) .detach(); + cx.spawn({ + let session = session.clone(); + async move |cx| { + async fn with_name( + name: &'static str, + task: Task>, + ) -> (&'static str, Result<()>) { + (name, task.await) + } + + let mut tasks = FuturesUnordered::new(); + tasks.push(with_name("iopub task", iopub_task)); + tasks.push(with_name("shell task", shell_task)); + tasks.push(with_name("control task", control_task)); + tasks.push(with_name("routing task", routing_task)); + + while let Some((name, result)) = tasks.next().await { + if let Err(err) = result { + log::error!("kernel: handling failed for {name}: {err:?}"); + + session + .update(cx, |session, cx| { + session.kernel_errored( + format!("handling failed for {name}: {err}"), + cx, + ); + cx.notify(); + }) + .ok(); + } + } + } + }) + .detach(); + let status = process.status(); let process_status_task = cx.spawn(async move |cx| { @@ -312,9 +342,6 @@ impl NativeRunningKernel { request_tx, working_directory, _process_status_task: Some(process_status_task), - _shell_task: shell_task, - _control_task: control_task, - _routing_task: routing_task, connection_path, execution_state: ExecutionState::Idle, kernel_info: None,