debugger: Fix issues with restarting sessions (cherry-pick #33932) (#34100)

gcp-cherry-pick-bot[bot] , Cole Miller , and Remco Smits created

Cherry-picked debugger: Fix issues with restarting sessions (#33932)

Restarting sessions was broken in #33273 when we moved away from calling
`kill` in the shutdown sequence. This PR re-adds that `kill` call so
that old debug adapter processes will be cleaned up when sessions are
restarted within Zed. This doesn't re-introduce the issue that motivated
the original changes to the shutdown sequence, because we still send
Disconnect/Terminate to debug adapters when quitting Zed without killing
the process directly.

We also now remove manually-restarted sessions eagerly from the session
list.

Closes #33916 

Release Notes:

- debugger: Fixed not being able to restart sessions for Debugpy and
other adapters that communicate over TCP.
- debugger: Fixed debug adapter processes not being cleaned up.

---------

Co-authored-by: Remco Smits <djsmits12@gmail.com>

Co-authored-by: Cole Miller <cole@zed.dev>
Co-authored-by: Remco Smits <djsmits12@gmail.com>

Change summary

crates/dap/src/client.rs                     |  8 ++
crates/dap/src/transport.rs                  | 65 +++++++++++----------
crates/debugger_ui/src/debugger_panel.rs     | 18 +++--
crates/debugger_ui/src/tests/attach_modal.rs | 10 ---
crates/project/src/debugger/session.rs       | 43 ++++++++++----
5 files changed, 82 insertions(+), 62 deletions(-)

Detailed changes

crates/dap/src/client.rs 🔗

@@ -2,7 +2,7 @@ use crate::{
     adapters::DebugAdapterBinary,
     transport::{IoKind, LogKind, TransportDelegate},
 };
-use anyhow::Result;
+use anyhow::{Context as _, Result};
 use dap_types::{
     messages::{Message, Response},
     requests::Request,
@@ -108,7 +108,11 @@ impl DebugAdapterClient {
             arguments: Some(serialized_arguments),
         };
         self.transport_delegate
-            .add_pending_request(sequence_id, callback_tx);
+            .pending_requests
+            .lock()
+            .as_mut()
+            .context("client is closed")?
+            .insert(sequence_id, callback_tx);
 
         log::debug!(
             "Client {} send `{}` request with sequence_id: {}",

crates/dap/src/transport.rs 🔗

@@ -49,7 +49,6 @@ pub enum IoKind {
     StdErr,
 }
 
-type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
 type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
 
 pub trait Transport: Send + Sync {
@@ -93,18 +92,14 @@ async fn start(
 
 pub(crate) struct TransportDelegate {
     log_handlers: LogHandlers,
-    pub(crate) pending_requests: Requests,
+    // TODO this should really be some kind of associative channel
+    pub(crate) pending_requests:
+        Arc<Mutex<Option<HashMap<u64, oneshot::Sender<Result<Response>>>>>>,
     pub(crate) transport: Mutex<Box<dyn Transport>>,
     pub(crate) server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
     tasks: Mutex<Vec<Task<()>>>,
 }
 
-impl Drop for TransportDelegate {
-    fn drop(&mut self) {
-        self.transport.lock().kill()
-    }
-}
-
 impl TransportDelegate {
     pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
         let log_handlers: LogHandlers = Default::default();
@@ -113,7 +108,7 @@ impl TransportDelegate {
             transport: Mutex::new(transport),
             log_handlers,
             server_tx: Default::default(),
-            pending_requests: Default::default(),
+            pending_requests: Arc::new(Mutex::new(Some(HashMap::default()))),
             tasks: Default::default(),
         })
     }
@@ -154,16 +149,26 @@ impl TransportDelegate {
                 .await
                 {
                     Ok(()) => {
-                        pending_requests.lock().drain().for_each(|(_, request)| {
-                            request
-                                .send(Err(anyhow!("debugger shutdown unexpectedly")))
-                                .ok();
-                        });
+                        pending_requests
+                            .lock()
+                            .take()
+                            .into_iter()
+                            .flatten()
+                            .for_each(|(_, request)| {
+                                request
+                                    .send(Err(anyhow!("debugger shutdown unexpectedly")))
+                                    .ok();
+                            });
                     }
                     Err(e) => {
-                        pending_requests.lock().drain().for_each(|(_, request)| {
-                            request.send(Err(e.cloned())).ok();
-                        });
+                        pending_requests
+                            .lock()
+                            .take()
+                            .into_iter()
+                            .flatten()
+                            .for_each(|(_, request)| {
+                                request.send(Err(e.cloned())).ok();
+                            });
                     }
                 }
             }));
@@ -188,15 +193,6 @@ impl TransportDelegate {
         self.transport.lock().tcp_arguments()
     }
 
-    pub(crate) fn add_pending_request(
-        &self,
-        sequence_id: u64,
-        request: oneshot::Sender<Result<Response>>,
-    ) {
-        let mut pending_requests = self.pending_requests.lock();
-        pending_requests.insert(sequence_id, request);
-    }
-
     pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
         if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
             server_tx.send(message).await.context("sending message")
@@ -290,7 +286,7 @@ impl TransportDelegate {
     async fn recv_from_server<Stdout>(
         server_stdout: Stdout,
         mut message_handler: DapMessageHandler,
-        pending_requests: Requests,
+        pending_requests: Arc<Mutex<Option<HashMap<u64, oneshot::Sender<Result<Response>>>>>>,
         log_handlers: Option<LogHandlers>,
     ) -> Result<()>
     where
@@ -300,16 +296,21 @@ impl TransportDelegate {
         let mut reader = BufReader::new(server_stdout);
 
         let result = loop {
-            match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
-                .await
-            {
+            let result =
+                Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
+                    .await;
+            match result {
                 ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
                 ConnectionResult::ConnectionReset => {
                     log::info!("Debugger closed the connection");
-                    return Ok(());
+                    break Ok(());
                 }
                 ConnectionResult::Result(Ok(Message::Response(res))) => {
-                    let tx = pending_requests.lock().remove(&res.request_seq);
+                    let tx = pending_requests
+                        .lock()
+                        .as_mut()
+                        .context("client is closed")?
+                        .remove(&res.request_seq);
                     if let Some(tx) = tx {
                         if let Err(e) = tx.send(Self::process_response(res)) {
                             log::trace!("Did not send response `{:?}` for a cancelled", e);

crates/debugger_ui/src/debugger_panel.rs 🔗

@@ -33,7 +33,7 @@ use std::sync::{Arc, LazyLock};
 use task::{DebugScenario, TaskContext};
 use tree_sitter::{Query, StreamingIterator as _};
 use ui::{ContextMenu, Divider, PopoverMenuHandle, Tooltip, prelude::*};
-use util::maybe;
+use util::{ResultExt, maybe};
 use workspace::SplitDirection;
 use workspace::{
     Pane, Workspace,
@@ -363,11 +363,17 @@ impl DebugPanel {
         let label = curr_session.read(cx).label().clone();
         let adapter = curr_session.read(cx).adapter().clone();
         let binary = curr_session.read(cx).binary().cloned().unwrap();
-        let task = curr_session.update(cx, |session, cx| session.shutdown(cx));
         let task_context = curr_session.read(cx).task_context().clone();
 
+        let curr_session_id = curr_session.read(cx).session_id();
+        self.sessions
+            .retain(|session| session.read(cx).session_id(cx) != curr_session_id);
+        let task = dap_store_handle.update(cx, |dap_store, cx| {
+            dap_store.shutdown_session(curr_session_id, cx)
+        });
+
         cx.spawn_in(window, async move |this, cx| {
-            task.await;
+            task.await.log_err();
 
             let (session, task) = dap_store_handle.update(cx, |dap_store, cx| {
                 let session = dap_store.new_session(label, adapter, task_context, None, cx);
@@ -1298,9 +1304,7 @@ impl Panel for DebugPanel {
 
 impl Render for DebugPanel {
     fn render(&mut self, window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
-        let has_sessions = self.sessions.len() > 0;
         let this = cx.weak_entity();
-        debug_assert_eq!(has_sessions, self.active_session.is_some());
 
         if self
             .active_session
@@ -1487,8 +1491,8 @@ impl Render for DebugPanel {
                 }))
             })
             .map(|this| {
-                if has_sessions {
-                    this.children(self.active_session.clone())
+                if let Some(active_session) = self.active_session.clone() {
+                    this.child(active_session)
                 } else {
                     let docked_to_bottom = self.position(window, cx) == DockPosition::Bottom;
                     let welcome_experience = v_flex()

crates/debugger_ui/src/tests/attach_modal.rs 🔗

@@ -27,7 +27,7 @@ async fn test_direct_attach_to_process(executor: BackgroundExecutor, cx: &mut Te
     let workspace = init_test_workspace(&project, cx).await;
     let cx = &mut VisualTestContext::from_window(*workspace, cx);
 
-    let session = start_debug_session_with(
+    let _session = start_debug_session_with(
         &workspace,
         cx,
         DebugTaskDefinition {
@@ -59,14 +59,6 @@ async fn test_direct_attach_to_process(executor: BackgroundExecutor, cx: &mut Te
             assert!(workspace.active_modal::<AttachModal>(cx).is_none());
         })
         .unwrap();
-
-    let shutdown_session = project.update(cx, |project, cx| {
-        project.dap_store().update(cx, |dap_store, cx| {
-            dap_store.shutdown_session(session.read(cx).session_id(), cx)
-        })
-    });
-
-    shutdown_session.await.unwrap();
 }
 
 #[gpui::test]

crates/project/src/debugger/session.rs 🔗

@@ -660,6 +660,7 @@ pub struct Session {
     ignore_breakpoints: bool,
     exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
     background_tasks: Vec<Task<()>>,
+    restart_task: Option<Task<()>>,
     task_context: TaskContext,
 }
 
@@ -821,6 +822,7 @@ impl Session {
                 loaded_sources: Vec::default(),
                 threads: IndexMap::default(),
                 background_tasks: Vec::default(),
+                restart_task: None,
                 locations: Default::default(),
                 is_session_terminated: false,
                 ignore_breakpoints: false,
@@ -1865,18 +1867,30 @@ impl Session {
     }
 
     pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
-        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
-            self.request(
-                RestartCommand {
-                    raw: args.unwrap_or(Value::Null),
-                },
-                Self::fallback_to_manual_restart,
-                cx,
-            )
-            .detach();
-        } else {
-            cx.emit(SessionStateEvent::Restart);
+        if self.restart_task.is_some() || self.as_running().is_none() {
+            return;
         }
+
+        let supports_dap_restart =
+            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
+
+        self.restart_task = Some(cx.spawn(async move |this, cx| {
+            let _ = this.update(cx, |session, cx| {
+                if supports_dap_restart {
+                    session
+                        .request(
+                            RestartCommand {
+                                raw: args.unwrap_or(Value::Null),
+                            },
+                            Self::fallback_to_manual_restart,
+                            cx,
+                        )
+                        .detach();
+                } else {
+                    cx.emit(SessionStateEvent::Restart);
+                }
+            });
+        }));
     }
 
     pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
@@ -1914,8 +1928,13 @@ impl Session {
 
         cx.emit(SessionStateEvent::Shutdown);
 
-        cx.spawn(async move |_, _| {
+        cx.spawn(async move |this, cx| {
             task.await;
+            let _ = this.update(cx, |this, _| {
+                if let Some(adapter_client) = this.adapter_client() {
+                    adapter_client.kill();
+                }
+            });
         })
     }