repl: Be more resilient with message parsing (#48837)

Kyle Kelley created

Closes #21529

Show a toast when a message from a Jupyter kernel doesn't parse
properly.

Discovered that some kernels don't include their execution count in an
`ExecuteReply` which is required in the jupyter protocol. Upstream I'll
go make that field either be an `Option` or just stick
`#[serde(default)]` on it so it's not a change in the interface.

Release Notes:

- Show error when parsing a message from a Jupyter kernel fails

Change summary

crates/repl/src/kernels/native_kernel.rs | 69 +++++++++++++++++++++----
1 file changed, 58 insertions(+), 11 deletions(-)

Detailed changes

crates/repl/src/kernels/native_kernel.rs 🔗

@@ -5,13 +5,13 @@ use futures::{
     io::BufReader,
     stream::FuturesUnordered,
 };
-use gpui::{App, AppContext as _, Entity, EntityId, Task, Window};
+use gpui::{App, AppContext as _, ClipboardItem, Entity, EntityId, Task, Window};
 use jupyter_protocol::{
     ExecutionState, JupyterKernelspec, JupyterMessage, JupyterMessageContent, KernelInfoReply,
     connection_info::{ConnectionInfo, Transport},
 };
 use project::Fs;
-use runtimelib::dirs;
+use runtimelib::{RuntimeError, dirs};
 use smol::{net::TcpListener, process::Command};
 use std::{
     env,
@@ -173,16 +173,63 @@ impl NativeRunningKernel {
 
                 async move |cx| -> anyhow::Result<()> {
                     loop {
-                        let message = futures::select! {
-                            msg = iopub.read().fuse() => msg.context("iopub recv")?,
-                            msg = shell.read().fuse() => msg.context("shell recv")?,
-                            msg = control.read().fuse() => msg.context("control recv")?,
+                        let (channel, result) = futures::select! {
+                            msg = iopub.read().fuse() => ("iopub", msg),
+                            msg = shell.read().fuse() => ("shell", msg),
+                            msg = control.read().fuse() => ("control", msg),
                         };
-                        session
-                            .update_in(cx, |session, window, cx| {
-                                session.route(&message, window, cx);
-                            })
-                            .ok();
+                        match result {
+                            Ok(message) => {
+                                session
+                                    .update_in(cx, |session, window, cx| {
+                                        session.route(&message, window, cx);
+                                    })
+                                    .ok();
+                            }
+                            Err(
+                                ref err @ (RuntimeError::ParseError { .. }
+                                | RuntimeError::SerdeError(_)),
+                            ) => {
+                                let error_detail =
+                                    format!("Kernel issue on {channel} channel\n\n{err}");
+                                log::warn!("kernel: {error_detail}");
+                                let workspace_window = session
+                                    .update_in(cx, |_, window, _cx| {
+                                        window
+                                            .window_handle()
+                                            .downcast::<workspace::Workspace>()
+                                    })
+                                    .ok()
+                                    .flatten();
+                                if let Some(workspace_window) = workspace_window {
+                                    workspace_window
+                                        .update(cx, |workspace, _window, cx| {
+                                            struct KernelReadError;
+                                            workspace.show_toast(
+                                                workspace::Toast::new(
+                                                    workspace::notifications::NotificationId::unique::<KernelReadError>(),
+                                                    error_detail.clone(),
+                                                )
+                                                .on_click(
+                                                    "Copy Error",
+                                                    move |_window, cx| {
+                                                        cx.write_to_clipboard(
+                                                            ClipboardItem::new_string(
+                                                                error_detail.clone(),
+                                                            ),
+                                                        );
+                                                    },
+                                                ),
+                                                cx,
+                                            );
+                                        })
+                                        .ok();
+                                }
+                            }
+                            Err(err) => {
+                                anyhow::bail!("{channel} recv: {err}");
+                            }
+                        }
                     }
                 }
             });