Propagate Jupyter client errors (#40886)

Lionel Henry created

Closes #40884

- Make IOPub task return a `Result`
- Create a monitoring task that watches over IOPub, Control, Routing and
Shell tasks.
- If any of these tasks fail, report the error with `kernel_errored()`
(which is already used to report process crashes)


https://github.com/user-attachments/assets/3125f6c7-099a-41ca-b668-fe694ecc68b9

This is not perfect. I did not have time to look into this but:

- When such errors happen, the kernel should be shut down.
- The kernel should no longer appear as online in the UI

But at least the user is getting feedback on what went wrong.

Release Notes:

- Jupyter client errors are now surfaced in the UI (#40884)

Change summary

crates/repl/src/kernels/native_kernel.rs | 55 +++++++++++++++++++------
1 file changed, 41 insertions(+), 14 deletions(-)

Detailed changes

crates/repl/src/kernels/native_kernel.rs 🔗

@@ -3,7 +3,7 @@ use futures::{
     AsyncBufReadExt as _, SinkExt as _,
     channel::mpsc::{self},
     io::BufReader,
-    stream::{SelectAll, StreamExt},
+    stream::{FuturesUnordered, SelectAll, StreamExt},
 };
 use gpui::{App, AppContext as _, Entity, EntityId, Task, Window};
 use jupyter_protocol::{
@@ -88,9 +88,6 @@ async fn peek_ports(ip: IpAddr) -> Result<[u16; 5]> {
 
 pub struct NativeRunningKernel {
     pub process: smol::process::Child,
-    _shell_task: Task<Result<()>>,
-    _control_task: Task<Result<()>>,
-    _routing_task: Task<Result<()>>,
     connection_path: PathBuf,
     _process_status_task: Option<Task<()>>,
     pub working_directory: PathBuf,
@@ -185,27 +182,25 @@ impl NativeRunningKernel {
                             })
                             .ok();
                     }
-                    anyhow::Ok(())
                 }
             })
             .detach();
 
             // iopub task
-            cx.spawn({
+            let iopub_task = cx.spawn({
                 let session = session.clone();
 
-                async move |cx| {
-                    while let Ok(message) = iopub_socket.read().await {
+                async move |cx| -> anyhow::Result<()> {
+                    loop {
+                        let message = iopub_socket.read().await?;
                         session
                             .update_in(cx, |session, window, cx| {
                                 session.route(&message, window, cx);
                             })
                             .ok();
                     }
-                    anyhow::Ok(())
                 }
-            })
-            .detach();
+            });
 
             let (mut control_request_tx, mut control_request_rx) =
                 futures::channel::mpsc::channel(100);
@@ -279,6 +274,41 @@ impl NativeRunningKernel {
             })
             .detach();
 
+            cx.spawn({
+                let session = session.clone();
+                async move |cx| {
+                    async fn with_name(
+                        name: &'static str,
+                        task: Task<Result<()>>,
+                    ) -> (&'static str, Result<()>) {
+                        (name, task.await)
+                    }
+
+                    let mut tasks = FuturesUnordered::new();
+                    tasks.push(with_name("iopub task", iopub_task));
+                    tasks.push(with_name("shell task", shell_task));
+                    tasks.push(with_name("control task", control_task));
+                    tasks.push(with_name("routing task", routing_task));
+
+                    while let Some((name, result)) = tasks.next().await {
+                        if let Err(err) = result {
+                            log::error!("kernel: handling failed for {name}: {err:?}");
+
+                            session
+                                .update(cx, |session, cx| {
+                                    session.kernel_errored(
+                                        format!("handling failed for {name}: {err}"),
+                                        cx,
+                                    );
+                                    cx.notify();
+                                })
+                                .ok();
+                        }
+                    }
+                }
+            })
+            .detach();
+
             let status = process.status();
 
             let process_status_task = cx.spawn(async move |cx| {
@@ -312,9 +342,6 @@ impl NativeRunningKernel {
                 request_tx,
                 working_directory,
                 _process_status_task: Some(process_status_task),
-                _shell_task: shell_task,
-                _control_task: control_task,
-                _routing_task: routing_task,
                 connection_path,
                 execution_state: ExecutionState::Idle,
                 kernel_info: None,