debugger: Fix endless restarts when connecting to TCP adapters over SSH (#34328)

Cole Miller created

Closes #34323
Closes #34313

The previous PR #33932 introduced a way to "close" the
`pending_requests` buffer of the `TransportDelegate`, preventing any
more requests from being added. This prevents pending requests from
accumulating without ever being drained during the shutdown sequence;
without it, some of our tests hang at this point (due to using a
single-threaded executor).

The bug occurred because we were closing `pending_requests` whenever we
detected the server side of the transport shut down, and this closed
state stuck around and interfered with the retry logic for SSH+TCP
adapter connections.

This PR fixes the bug by only closing `pending_requests` on session
shutdown, and adds a regression test covering the SSH retry logic.

Release Notes:

- debugger: Fixed a bug causing SSH connections to some adapters
(Python, Go, JavaScript) to fail and restart endlessly.

Change summary

Cargo.lock                                                    |   1 
crates/collab/Cargo.toml                                      |   1 
crates/collab/src/tests/remote_editing_collaboration_tests.rs | 167 ++
crates/dap/src/adapters.rs                                    |  10 
crates/dap/src/client.rs                                      |  32 
crates/dap/src/transport.rs                                   | 345 +++-
crates/debugger_ui/src/session.rs                             |   2 
crates/debugger_ui/src/session/running.rs                     |   2 
8 files changed, 429 insertions(+), 131 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -3109,6 +3109,7 @@ dependencies = [
  "context_server",
  "ctor",
  "dap",
+ "dap-types",
  "dap_adapters",
  "dashmap 6.1.0",
  "debugger_ui",

crates/collab/Cargo.toml 🔗

@@ -94,6 +94,7 @@ context_server.workspace = true
 ctor.workspace = true
 dap = { workspace = true, features = ["test-support"] }
 dap_adapters = { workspace = true, features = ["test-support"] }
+dap-types.workspace = true
 debugger_ui = { workspace = true, features = ["test-support"] }
 editor = { workspace = true, features = ["test-support"] }
 extension.workspace = true

crates/collab/src/tests/remote_editing_collaboration_tests.rs 🔗

@@ -2,6 +2,7 @@ use crate::tests::TestServer;
 use call::ActiveCall;
 use collections::{HashMap, HashSet};
 
+use dap::{Capabilities, adapters::DebugTaskDefinition, transport::RequestHandling};
 use debugger_ui::debugger_panel::DebugPanel;
 use extension::ExtensionHostProxy;
 use fs::{FakeFs, Fs as _, RemoveOptions};
@@ -22,6 +23,7 @@ use language::{
 use node_runtime::NodeRuntime;
 use project::{
     ProjectPath,
+    debugger::session::ThreadId,
     lsp_store::{FormatTrigger, LspFormatTarget},
 };
 use remote::SshRemoteClient;
@@ -29,7 +31,11 @@ use remote_server::{HeadlessAppState, HeadlessProject};
 use rpc::proto;
 use serde_json::json;
 use settings::SettingsStore;
-use std::{path::Path, sync::Arc};
+use std::{
+    path::Path,
+    sync::{Arc, atomic::AtomicUsize},
+};
+use task::TcpArgumentsTemplate;
 use util::path;
 
 #[gpui::test(iterations = 10)]
@@ -688,3 +694,162 @@ async fn test_remote_server_debugger(
 
     shutdown_session.await.unwrap();
 }
+
+#[gpui::test]
+async fn test_slow_adapter_startup_retries(
+    cx_a: &mut TestAppContext,
+    server_cx: &mut TestAppContext,
+    executor: BackgroundExecutor,
+) {
+    cx_a.update(|cx| {
+        release_channel::init(SemanticVersion::default(), cx);
+        command_palette_hooks::init(cx);
+        zlog::init_test();
+        dap_adapters::init(cx);
+    });
+    server_cx.update(|cx| {
+        release_channel::init(SemanticVersion::default(), cx);
+        dap_adapters::init(cx);
+    });
+    let (opts, server_ssh) = SshRemoteClient::fake_server(cx_a, server_cx);
+    let remote_fs = FakeFs::new(server_cx.executor());
+    remote_fs
+        .insert_tree(
+            path!("/code"),
+            json!({
+                "lib.rs": "fn one() -> usize { 1 }"
+            }),
+        )
+        .await;
+
+    // User A connects to the remote project via SSH.
+    server_cx.update(HeadlessProject::init);
+    let remote_http_client = Arc::new(BlockedHttpClient);
+    let node = NodeRuntime::unavailable();
+    let languages = Arc::new(LanguageRegistry::new(server_cx.executor()));
+    let _headless_project = server_cx.new(|cx| {
+        client::init_settings(cx);
+        HeadlessProject::new(
+            HeadlessAppState {
+                session: server_ssh,
+                fs: remote_fs.clone(),
+                http_client: remote_http_client,
+                node_runtime: node,
+                languages,
+                extension_host_proxy: Arc::new(ExtensionHostProxy::new()),
+            },
+            cx,
+        )
+    });
+
+    let client_ssh = SshRemoteClient::fake_client(opts, cx_a).await;
+    let mut server = TestServer::start(server_cx.executor()).await;
+    let client_a = server.create_client(cx_a, "user_a").await;
+    cx_a.update(|cx| {
+        debugger_ui::init(cx);
+        command_palette_hooks::init(cx);
+    });
+    let (project_a, _) = client_a
+        .build_ssh_project(path!("/code"), client_ssh.clone(), cx_a)
+        .await;
+
+    let (workspace, cx_a) = client_a.build_workspace(&project_a, cx_a);
+
+    let debugger_panel = workspace
+        .update_in(cx_a, |_workspace, window, cx| {
+            cx.spawn_in(window, DebugPanel::load)
+        })
+        .await
+        .unwrap();
+
+    workspace.update_in(cx_a, |workspace, window, cx| {
+        workspace.add_panel(debugger_panel, window, cx);
+    });
+
+    cx_a.run_until_parked();
+    let debug_panel = workspace
+        .update(cx_a, |workspace, cx| workspace.panel::<DebugPanel>(cx))
+        .unwrap();
+
+    let workspace_window = cx_a
+        .window_handle()
+        .downcast::<workspace::Workspace>()
+        .unwrap();
+
+    let count = Arc::new(AtomicUsize::new(0));
+    let session = debugger_ui::tests::start_debug_session_with(
+        &workspace_window,
+        cx_a,
+        DebugTaskDefinition {
+            adapter: "fake-adapter".into(),
+            label: "test".into(),
+            config: json!({
+                "request": "launch"
+            }),
+            tcp_connection: Some(TcpArgumentsTemplate {
+                port: None,
+                host: None,
+                timeout: None,
+            }),
+        },
+        move |client| {
+            let count = count.clone();
+            client.on_request_ext::<dap::requests::Initialize, _>(move |_seq, _request| {
+                if count.fetch_add(1, std::sync::atomic::Ordering::SeqCst) < 5 {
+                    return RequestHandling::Exit;
+                }
+                RequestHandling::Respond(Ok(Capabilities::default()))
+            });
+        },
+    )
+    .unwrap();
+    cx_a.run_until_parked();
+
+    let client = session.update(cx_a, |session, _| session.adapter_client().unwrap());
+    client
+        .fake_event(dap::messages::Events::Stopped(dap::StoppedEvent {
+            reason: dap::StoppedEventReason::Pause,
+            description: None,
+            thread_id: Some(1),
+            preserve_focus_hint: None,
+            text: None,
+            all_threads_stopped: None,
+            hit_breakpoint_ids: None,
+        }))
+        .await;
+
+    cx_a.run_until_parked();
+
+    let active_session = debug_panel
+        .update(cx_a, |this, _| this.active_session())
+        .unwrap();
+
+    let running_state = active_session.update(cx_a, |active_session, _| {
+        active_session.running_state().clone()
+    });
+
+    assert_eq!(
+        client.id(),
+        running_state.read_with(cx_a, |running_state, _| running_state.session_id())
+    );
+    assert_eq!(
+        ThreadId(1),
+        running_state.read_with(cx_a, |running_state, _| running_state
+            .selected_thread_id()
+            .unwrap())
+    );
+
+    let shutdown_session = workspace.update(cx_a, |workspace, cx| {
+        workspace.project().update(cx, |project, cx| {
+            project.dap_store().update(cx, |dap_store, cx| {
+                dap_store.shutdown_session(session.read(cx).session_id(), cx)
+            })
+        })
+    });
+
+    client_ssh.update(cx_a, |a, _| {
+        a.shutdown_processes(Some(proto::ShutdownRemoteServer {}), executor)
+    });
+
+    shutdown_session.await.unwrap();
+}

crates/dap/src/adapters.rs 🔗

@@ -442,10 +442,18 @@ impl DebugAdapter for FakeAdapter {
         _: Option<Vec<String>>,
         _: &mut AsyncApp,
     ) -> Result<DebugAdapterBinary> {
+        let connection = task_definition
+            .tcp_connection
+            .as_ref()
+            .map(|connection| TcpArguments {
+                host: connection.host(),
+                port: connection.port.unwrap_or(17),
+                timeout: connection.timeout,
+            });
         Ok(DebugAdapterBinary {
             command: Some("command".into()),
             arguments: vec![],
-            connection: None,
+            connection,
             envs: HashMap::default(),
             cwd: None,
             request_args: StartDebuggingRequestArguments {

crates/dap/src/client.rs 🔗

@@ -2,7 +2,7 @@ use crate::{
     adapters::DebugAdapterBinary,
     transport::{IoKind, LogKind, TransportDelegate},
 };
-use anyhow::{Context as _, Result};
+use anyhow::Result;
 use dap_types::{
     messages::{Message, Response},
     requests::Request,
@@ -110,9 +110,7 @@ impl DebugAdapterClient {
         self.transport_delegate
             .pending_requests
             .lock()
-            .as_mut()
-            .context("client is closed")?
-            .insert(sequence_id, callback_tx);
+            .insert(sequence_id, callback_tx)?;
 
         log::debug!(
             "Client {} send `{}` request with sequence_id: {}",
@@ -170,6 +168,7 @@ impl DebugAdapterClient {
     pub fn kill(&self) {
         log::debug!("Killing DAP process");
         self.transport_delegate.transport.lock().kill();
+        self.transport_delegate.pending_requests.lock().shutdown();
     }
 
     pub fn has_adapter_logs(&self) -> bool {
@@ -184,11 +183,34 @@ impl DebugAdapterClient {
     }
 
     #[cfg(any(test, feature = "test-support"))]
-    pub fn on_request<R: dap_types::requests::Request, F>(&self, handler: F)
+    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
     where
         F: 'static
             + Send
             + FnMut(u64, R::Arguments) -> Result<R::Response, dap_types::ErrorResponse>,
+    {
+        use crate::transport::RequestHandling;
+
+        self.transport_delegate
+            .transport
+            .lock()
+            .as_fake()
+            .on_request::<R, _>(move |seq, request| {
+                RequestHandling::Respond(handler(seq, request))
+            });
+    }
+
+    #[cfg(any(test, feature = "test-support"))]
+    pub fn on_request_ext<R: dap_types::requests::Request, F>(&self, handler: F)
+    where
+        F: 'static
+            + Send
+            + FnMut(
+                u64,
+                R::Arguments,
+            ) -> crate::transport::RequestHandling<
+                Result<R::Response, dap_types::ErrorResponse>,
+            >,
     {
         self.transport_delegate
             .transport

crates/dap/src/transport.rs 🔗

@@ -49,6 +49,12 @@ pub enum IoKind {
     StdErr,
 }
 
+#[cfg(any(test, feature = "test-support"))]
+pub enum RequestHandling<T> {
+    Respond(T),
+    Exit,
+}
+
 type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
 
 pub trait Transport: Send + Sync {
@@ -76,7 +82,11 @@ async fn start(
 ) -> Result<Box<dyn Transport>> {
     #[cfg(any(test, feature = "test-support"))]
     if cfg!(any(test, feature = "test-support")) {
-        return Ok(Box::new(FakeTransport::start(cx).await?));
+        if let Some(connection) = binary.connection.clone() {
+            return Ok(Box::new(FakeTransport::start_tcp(connection, cx).await?));
+        } else {
+            return Ok(Box::new(FakeTransport::start_stdio(cx).await?));
+        }
     }
 
     if binary.connection.is_some() {
@@ -90,11 +100,57 @@ async fn start(
     }
 }
 
+pub(crate) struct PendingRequests {
+    inner: Option<HashMap<u64, oneshot::Sender<Result<Response>>>>,
+}
+
+impl PendingRequests {
+    fn new() -> Self {
+        Self {
+            inner: Some(HashMap::default()),
+        }
+    }
+
+    fn flush(&mut self, e: anyhow::Error) {
+        let Some(inner) = self.inner.as_mut() else {
+            return;
+        };
+        for (_, sender) in inner.drain() {
+            sender.send(Err(e.cloned())).ok();
+        }
+    }
+
+    pub(crate) fn insert(
+        &mut self,
+        sequence_id: u64,
+        callback_tx: oneshot::Sender<Result<Response>>,
+    ) -> anyhow::Result<()> {
+        let Some(inner) = self.inner.as_mut() else {
+            bail!("client is closed")
+        };
+        inner.insert(sequence_id, callback_tx);
+        Ok(())
+    }
+
+    pub(crate) fn remove(
+        &mut self,
+        sequence_id: u64,
+    ) -> anyhow::Result<Option<oneshot::Sender<Result<Response>>>> {
+        let Some(inner) = self.inner.as_mut() else {
+            bail!("client is closed");
+        };
+        Ok(inner.remove(&sequence_id))
+    }
+
+    pub(crate) fn shutdown(&mut self) {
+        self.flush(anyhow!("transport shutdown"));
+        self.inner = None;
+    }
+}
+
 pub(crate) struct TransportDelegate {
     log_handlers: LogHandlers,
-    // TODO this should really be some kind of associative channel
-    pub(crate) pending_requests:
-        Arc<Mutex<Option<HashMap<u64, oneshot::Sender<Result<Response>>>>>>,
+    pub(crate) pending_requests: Arc<Mutex<PendingRequests>>,
     pub(crate) transport: Mutex<Box<dyn Transport>>,
     pub(crate) server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
     tasks: Mutex<Vec<Task<()>>>,
@@ -108,7 +164,7 @@ impl TransportDelegate {
             transport: Mutex::new(transport),
             log_handlers,
             server_tx: Default::default(),
-            pending_requests: Arc::new(Mutex::new(Some(HashMap::default()))),
+            pending_requests: Arc::new(Mutex::new(PendingRequests::new())),
             tasks: Default::default(),
         })
     }
@@ -151,24 +207,10 @@ impl TransportDelegate {
                     Ok(()) => {
                         pending_requests
                             .lock()
-                            .take()
-                            .into_iter()
-                            .flatten()
-                            .for_each(|(_, request)| {
-                                request
-                                    .send(Err(anyhow!("debugger shutdown unexpectedly")))
-                                    .ok();
-                            });
+                            .flush(anyhow!("debugger shutdown unexpectedly"));
                     }
                     Err(e) => {
-                        pending_requests
-                            .lock()
-                            .take()
-                            .into_iter()
-                            .flatten()
-                            .for_each(|(_, request)| {
-                                request.send(Err(e.cloned())).ok();
-                            });
+                        pending_requests.lock().flush(e);
                     }
                 }
             }));
@@ -286,7 +328,7 @@ impl TransportDelegate {
     async fn recv_from_server<Stdout>(
         server_stdout: Stdout,
         mut message_handler: DapMessageHandler,
-        pending_requests: Arc<Mutex<Option<HashMap<u64, oneshot::Sender<Result<Response>>>>>>,
+        pending_requests: Arc<Mutex<PendingRequests>>,
         log_handlers: Option<LogHandlers>,
     ) -> Result<()>
     where
@@ -303,14 +345,10 @@ impl TransportDelegate {
                 ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
                 ConnectionResult::ConnectionReset => {
                     log::info!("Debugger closed the connection");
-                    break Ok(());
+                    return Ok(());
                 }
                 ConnectionResult::Result(Ok(Message::Response(res))) => {
-                    let tx = pending_requests
-                        .lock()
-                        .as_mut()
-                        .context("client is closed")?
-                        .remove(&res.request_seq);
+                    let tx = pending_requests.lock().remove(res.request_seq)?;
                     if let Some(tx) = tx {
                         if let Err(e) = tx.send(Self::process_response(res)) {
                             log::trace!("Did not send response `{:?}` for a cancelled", e);
@@ -704,8 +742,7 @@ impl Drop for StdioTransport {
 }
 
 #[cfg(any(test, feature = "test-support"))]
-type RequestHandler =
-    Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
+type RequestHandler = Box<dyn Send + FnMut(u64, serde_json::Value) -> RequestHandling<Response>>;
 
 #[cfg(any(test, feature = "test-support"))]
 type ResponseHandler = Box<dyn Send + Fn(Response)>;
@@ -716,23 +753,38 @@ pub struct FakeTransport {
     request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
     // for reverse request responses
     response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
-
-    stdin_writer: Option<PipeWriter>,
-    stdout_reader: Option<PipeReader>,
     message_handler: Option<Task<Result<()>>>,
+    kind: FakeTransportKind,
+}
+
+#[cfg(any(test, feature = "test-support"))]
+pub enum FakeTransportKind {
+    Stdio {
+        stdin_writer: Option<PipeWriter>,
+        stdout_reader: Option<PipeReader>,
+    },
+    Tcp {
+        connection: TcpArguments,
+        executor: BackgroundExecutor,
+    },
 }
 
 #[cfg(any(test, feature = "test-support"))]
 impl FakeTransport {
     pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
     where
-        F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
+        F: 'static
+            + Send
+            + FnMut(u64, R::Arguments) -> RequestHandling<Result<R::Response, ErrorResponse>>,
     {
         self.request_handlers.lock().insert(
             R::COMMAND,
             Box::new(move |seq, args| {
                 let result = handler(seq, serde_json::from_value(args).unwrap());
-                let response = match result {
+                let RequestHandling::Respond(response) = result else {
+                    return RequestHandling::Exit;
+                };
+                let response = match response {
                     Ok(response) => Response {
                         seq: seq + 1,
                         request_seq: seq,
@@ -750,7 +802,7 @@ impl FakeTransport {
                         message: None,
                     },
                 };
-                response
+                RequestHandling::Respond(response)
             }),
         );
     }
@@ -764,86 +816,75 @@ impl FakeTransport {
             .insert(R::COMMAND, Box::new(handler));
     }
 
-    async fn start(cx: &mut AsyncApp) -> Result<Self> {
-        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
-        use serde_json::json;
-
-        let (stdin_writer, stdin_reader) = async_pipe::pipe();
-        let (stdout_writer, stdout_reader) = async_pipe::pipe();
-
-        let mut this = Self {
+    async fn start_tcp(connection: TcpArguments, cx: &mut AsyncApp) -> Result<Self> {
+        Ok(Self {
             request_handlers: Arc::new(Mutex::new(HashMap::default())),
             response_handlers: Arc::new(Mutex::new(HashMap::default())),
-            stdin_writer: Some(stdin_writer),
-            stdout_reader: Some(stdout_reader),
             message_handler: None,
-        };
+            kind: FakeTransportKind::Tcp {
+                connection,
+                executor: cx.background_executor().clone(),
+            },
+        })
+    }
 
-        let request_handlers = this.request_handlers.clone();
-        let response_handlers = this.response_handlers.clone();
+    async fn handle_messages(
+        request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
+        response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
+        stdin_reader: PipeReader,
+        stdout_writer: PipeWriter,
+    ) -> Result<()> {
+        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
+        use serde_json::json;
+
+        let mut reader = BufReader::new(stdin_reader);
         let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer));
+        let mut buffer = String::new();
 
-        this.message_handler = Some(cx.background_spawn(async move {
-            let mut reader = BufReader::new(stdin_reader);
-            let mut buffer = String::new();
+        loop {
+            match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None).await {
+                ConnectionResult::Timeout => {
+                    anyhow::bail!("Timed out when connecting to debugger");
+                }
+                ConnectionResult::ConnectionReset => {
+                    log::info!("Debugger closed the connection");
+                    break Ok(());
+                }
+                ConnectionResult::Result(Err(e)) => break Err(e),
+                ConnectionResult::Result(Ok(message)) => {
+                    match message {
+                        Message::Request(request) => {
+                            // redirect reverse requests to stdout writer/reader
+                            if request.command == RunInTerminal::COMMAND
+                                || request.command == StartDebugging::COMMAND
+                            {
+                                let message =
+                                    serde_json::to_string(&Message::Request(request)).unwrap();
 
-            loop {
-                match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
-                    .await
-                {
-                    ConnectionResult::Timeout => {
-                        anyhow::bail!("Timed out when connecting to debugger");
-                    }
-                    ConnectionResult::ConnectionReset => {
-                        log::info!("Debugger closed the connection");
-                        break Ok(());
-                    }
-                    ConnectionResult::Result(Err(e)) => break Err(e),
-                    ConnectionResult::Result(Ok(message)) => {
-                        match message {
-                            Message::Request(request) => {
-                                // redirect reverse requests to stdout writer/reader
-                                if request.command == RunInTerminal::COMMAND
-                                    || request.command == StartDebugging::COMMAND
+                                let mut writer = stdout_writer.lock().await;
+                                writer
+                                    .write_all(
+                                        TransportDelegate::build_rpc_message(message).as_bytes(),
+                                    )
+                                    .await
+                                    .unwrap();
+                                writer.flush().await.unwrap();
+                            } else {
+                                let response = if let Some(handle) =
+                                    request_handlers.lock().get_mut(request.command.as_str())
                                 {
-                                    let message =
-                                        serde_json::to_string(&Message::Request(request)).unwrap();
-
-                                    let mut writer = stdout_writer.lock().await;
-                                    writer
-                                        .write_all(
-                                            TransportDelegate::build_rpc_message(message)
-                                                .as_bytes(),
-                                        )
-                                        .await
-                                        .unwrap();
-                                    writer.flush().await.unwrap();
+                                    handle(request.seq, request.arguments.unwrap_or(json!({})))
                                 } else {
-                                    let response = if let Some(handle) =
-                                        request_handlers.lock().get_mut(request.command.as_str())
-                                    {
-                                        handle(request.seq, request.arguments.unwrap_or(json!({})))
-                                    } else {
-                                        panic!("No request handler for {}", request.command);
-                                    };
-                                    let message =
-                                        serde_json::to_string(&Message::Response(response))
-                                            .unwrap();
-
-                                    let mut writer = stdout_writer.lock().await;
-                                    writer
-                                        .write_all(
-                                            TransportDelegate::build_rpc_message(message)
-                                                .as_bytes(),
-                                        )
-                                        .await
-                                        .unwrap();
-                                    writer.flush().await.unwrap();
-                                }
-                            }
-                            Message::Event(event) => {
+                                    panic!("No request handler for {}", request.command);
+                                };
+                                let response = match response {
+                                    RequestHandling::Respond(response) => response,
+                                    RequestHandling::Exit => {
+                                        break Err(anyhow!("exit in response to request"));
+                                    }
+                                };
                                 let message =
-                                    serde_json::to_string(&Message::Event(event)).unwrap();
+                                    serde_json::to_string(&Message::Response(response)).unwrap();
 
                                 let mut writer = stdout_writer.lock().await;
                                 writer
@@ -854,20 +895,56 @@ impl FakeTransport {
                                     .unwrap();
                                 writer.flush().await.unwrap();
                             }
-                            Message::Response(response) => {
-                                if let Some(handle) =
-                                    response_handlers.lock().get(response.command.as_str())
-                                {
-                                    handle(response);
-                                } else {
-                                    log::error!("No response handler for {}", response.command);
-                                }
+                        }
+                        Message::Event(event) => {
+                            let message = serde_json::to_string(&Message::Event(event)).unwrap();
+
+                            let mut writer = stdout_writer.lock().await;
+                            writer
+                                .write_all(TransportDelegate::build_rpc_message(message).as_bytes())
+                                .await
+                                .unwrap();
+                            writer.flush().await.unwrap();
+                        }
+                        Message::Response(response) => {
+                            if let Some(handle) =
+                                response_handlers.lock().get(response.command.as_str())
+                            {
+                                handle(response);
+                            } else {
+                                log::error!("No response handler for {}", response.command);
                             }
                         }
                     }
                 }
             }
-        }));
+        }
+    }
+
+    async fn start_stdio(cx: &mut AsyncApp) -> Result<Self> {
+        let (stdin_writer, stdin_reader) = async_pipe::pipe();
+        let (stdout_writer, stdout_reader) = async_pipe::pipe();
+        let kind = FakeTransportKind::Stdio {
+            stdin_writer: Some(stdin_writer),
+            stdout_reader: Some(stdout_reader),
+        };
+
+        let mut this = Self {
+            request_handlers: Arc::new(Mutex::new(HashMap::default())),
+            response_handlers: Arc::new(Mutex::new(HashMap::default())),
+            message_handler: None,
+            kind,
+        };
+
+        let request_handlers = this.request_handlers.clone();
+        let response_handlers = this.response_handlers.clone();
+
+        this.message_handler = Some(cx.background_spawn(Self::handle_messages(
+            request_handlers,
+            response_handlers,
+            stdin_reader,
+            stdout_writer,
+        )));
 
         Ok(this)
     }
@@ -876,7 +953,10 @@ impl FakeTransport {
 #[cfg(any(test, feature = "test-support"))]
 impl Transport for FakeTransport {
     fn tcp_arguments(&self) -> Option<TcpArguments> {
-        None
+        match &self.kind {
+            FakeTransportKind::Stdio { .. } => None,
+            FakeTransportKind::Tcp { connection, .. } => Some(connection.clone()),
+        }
     }
 
     fn connect(
@@ -887,12 +967,33 @@ impl Transport for FakeTransport {
             Box<dyn AsyncRead + Unpin + Send + 'static>,
         )>,
     > {
-        let result = util::maybe!({
-            Ok((
-                Box::new(self.stdin_writer.take().context("Cannot reconnect")?) as _,
-                Box::new(self.stdout_reader.take().context("Cannot reconnect")?) as _,
-            ))
-        });
+        let result = match &mut self.kind {
+            FakeTransportKind::Stdio {
+                stdin_writer,
+                stdout_reader,
+            } => util::maybe!({
+                Ok((
+                    Box::new(stdin_writer.take().context("Cannot reconnect")?) as _,
+                    Box::new(stdout_reader.take().context("Cannot reconnect")?) as _,
+                ))
+            }),
+            FakeTransportKind::Tcp { executor, .. } => {
+                let (stdin_writer, stdin_reader) = async_pipe::pipe();
+                let (stdout_writer, stdout_reader) = async_pipe::pipe();
+
+                let request_handlers = self.request_handlers.clone();
+                let response_handlers = self.response_handlers.clone();
+
+                self.message_handler = Some(executor.spawn(Self::handle_messages(
+                    request_handlers,
+                    response_handlers,
+                    stdin_reader,
+                    stdout_writer,
+                )));
+
+                Ok((Box::new(stdin_writer) as _, Box::new(stdout_reader) as _))
+            }
+        };
         Task::ready(result)
     }
 

crates/debugger_ui/src/session.rs 🔗

@@ -122,7 +122,7 @@ impl DebugSession {
             .to_owned()
     }
 
-    pub(crate) fn running_state(&self) -> &Entity<RunningState> {
+    pub fn running_state(&self) -> &Entity<RunningState> {
         &self.running_state
     }
 

crates/debugger_ui/src/session/running.rs 🔗

@@ -1459,7 +1459,7 @@ impl RunningState {
         }
     }
 
-    pub(crate) fn selected_thread_id(&self) -> Option<ThreadId> {
+    pub fn selected_thread_id(&self) -> Option<ThreadId> {
         self.thread_id
     }