@@ -8,8 +8,7 @@ use dap_types::{
requests::Request,
};
use futures::channel::oneshot;
-use gpui::{AppContext, AsyncApp};
-use smol::channel::{Receiver, Sender};
+use gpui::AsyncApp;
use std::{
hash::Hash,
sync::atomic::{AtomicU64, Ordering},
@@ -44,97 +43,54 @@ impl DebugAdapterClient {
id: SessionId,
binary: DebugAdapterBinary,
message_handler: DapMessageHandler,
- cx: AsyncApp,
+ cx: &mut AsyncApp,
) -> Result<Self> {
- let ((server_rx, server_tx), transport_delegate) =
- TransportDelegate::start(&binary, cx.clone()).await?;
+ let transport_delegate = TransportDelegate::start(&binary, cx).await?;
let this = Self {
id,
binary,
transport_delegate,
sequence_count: AtomicU64::new(1),
};
- log::info!("Successfully connected to debug adapter");
+ this.connect(message_handler, cx).await?;
- let client_id = this.id;
+ Ok(this)
+ }
- // start handling events/reverse requests
- cx.background_spawn(Self::handle_receive_messages(
- client_id,
- server_rx,
- server_tx.clone(),
- message_handler,
- ))
- .detach();
+ pub fn should_reconnect_for_ssh(&self) -> bool {
+ self.transport_delegate.tcp_arguments().is_some()
+ && self.binary.command.as_deref() == Some("ssh")
+ }
- Ok(this)
+ pub async fn connect(
+ &self,
+ message_handler: DapMessageHandler,
+ cx: &mut AsyncApp,
+ ) -> Result<()> {
+ self.transport_delegate.connect(message_handler, cx).await
}
- pub async fn reconnect(
+ pub async fn create_child_connection(
&self,
session_id: SessionId,
binary: DebugAdapterBinary,
message_handler: DapMessageHandler,
- cx: AsyncApp,
+ cx: &mut AsyncApp,
) -> Result<Self> {
- let binary = match self.transport_delegate.transport() {
- crate::transport::Transport::Tcp(tcp_transport) => DebugAdapterBinary {
+ let binary = if let Some(connection) = self.transport_delegate.tcp_arguments() {
+ DebugAdapterBinary {
command: None,
arguments: Default::default(),
envs: Default::default(),
cwd: Default::default(),
- connection: Some(crate::adapters::TcpArguments {
- host: tcp_transport.host,
- port: tcp_transport.port,
- timeout: Some(tcp_transport.timeout),
- }),
+ connection: Some(connection),
request_args: binary.request_args,
- },
- _ => self.binary.clone(),
- };
-
- Self::start(session_id, binary, message_handler, cx).await
- }
-
- async fn handle_receive_messages(
- client_id: SessionId,
- server_rx: Receiver<Message>,
- client_tx: Sender<Message>,
- mut message_handler: DapMessageHandler,
- ) -> Result<()> {
- let result = loop {
- let message = match server_rx.recv().await {
- Ok(message) => message,
- Err(e) => break Err(e.into()),
- };
- match message {
- Message::Event(ev) => {
- log::debug!("Client {} received event `{}`", client_id.0, &ev);
-
- message_handler(Message::Event(ev))
- }
- Message::Request(req) => {
- log::debug!(
- "Client {} received reverse request `{}`",
- client_id.0,
- &req.command
- );
-
- message_handler(Message::Request(req))
- }
- Message::Response(response) => {
- log::debug!("Received response after request timeout: {:#?}", response);
- }
}
-
- smol::future::yield_now().await;
+ } else {
+ self.binary.clone()
};
- drop(client_tx);
-
- log::debug!("Handle receive messages dropped");
-
- result
+ Self::start(session_id, binary, message_handler, cx).await
}
/// Send a request to an adapter and get a response back
@@ -152,8 +108,7 @@ impl DebugAdapterClient {
arguments: Some(serialized_arguments),
};
self.transport_delegate
- .add_pending_request(sequence_id, callback_tx)
- .await;
+ .add_pending_request(sequence_id, callback_tx);
log::debug!(
"Client {} send `{}` request with sequence_id: {}",
@@ -230,8 +185,11 @@ impl DebugAdapterClient {
+ Send
+ FnMut(u64, R::Arguments) -> Result<R::Response, dap_types::ErrorResponse>,
{
- let transport = self.transport_delegate.transport().as_fake();
- transport.on_request::<R, F>(handler);
+ self.transport_delegate
+ .transport
+ .lock()
+ .as_fake()
+ .on_request::<R, F>(handler);
}
#[cfg(any(test, feature = "test-support"))]
@@ -250,8 +208,11 @@ impl DebugAdapterClient {
where
F: 'static + Send + Fn(Response),
{
- let transport = self.transport_delegate.transport().as_fake();
- transport.on_response::<R, F>(handler).await;
+ self.transport_delegate
+ .transport
+ .lock()
+ .as_fake()
+ .on_response::<R, F>(handler);
}
#[cfg(any(test, feature = "test-support"))]
@@ -308,7 +269,7 @@ mod tests {
},
},
Box::new(|_| panic!("Did not expect to hit this code path")),
- cx.to_async(),
+ &mut cx.to_async(),
)
.await
.unwrap();
@@ -390,7 +351,7 @@ mod tests {
);
}
}),
- cx.to_async(),
+ &mut cx.to_async(),
)
.await
.unwrap();
@@ -448,7 +409,7 @@ mod tests {
);
}
}),
- cx.to_async(),
+ &mut cx.to_async(),
)
.await
.unwrap();
@@ -1,16 +1,19 @@
use anyhow::{Context as _, Result, anyhow, bail};
+#[cfg(any(test, feature = "test-support"))]
+use async_pipe::{PipeReader, PipeWriter};
use dap_types::{
ErrorResponse,
messages::{Message, Response},
};
use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
-use gpui::{AppContext as _, AsyncApp, Task};
+use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task};
+use parking_lot::Mutex;
+use proto::ErrorExt;
use settings::Settings as _;
use smallvec::SmallVec;
use smol::{
channel::{Receiver, Sender, unbounded},
io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
- lock::Mutex,
net::{TcpListener, TcpStream},
};
use std::{
@@ -23,7 +26,11 @@ use std::{
use task::TcpArgumentsTemplate;
use util::ConnectionResult;
-use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
+use crate::{
+ adapters::{DebugAdapterBinary, TcpArguments},
+ client::DapMessageHandler,
+ debugger_settings::DebuggerSettings,
+};
pub(crate) type IoMessage = str;
pub(crate) type Command = str;
@@ -35,232 +42,152 @@ pub enum LogKind {
Rpc,
}
+#[derive(Clone, Copy)]
pub enum IoKind {
StdIn,
StdOut,
StdErr,
}
-pub struct TransportPipe {
- input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
- output: Box<dyn AsyncRead + Unpin + Send + 'static>,
- stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
- stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
-}
-
-impl TransportPipe {
- pub fn new(
- input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
- output: Box<dyn AsyncRead + Unpin + Send + 'static>,
- stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
- stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
- ) -> Self {
- TransportPipe {
- input,
- output,
- stdout,
- stderr,
- }
- }
-}
-
type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
-type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
+type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
-pub enum Transport {
- Stdio(StdioTransport),
- Tcp(TcpTransport),
+pub trait Transport: Send + Sync {
+ fn has_adapter_logs(&self) -> bool;
+ fn tcp_arguments(&self) -> Option<TcpArguments>;
+ fn connect(
+ &mut self,
+ ) -> Task<
+ Result<(
+ Box<dyn AsyncWrite + Unpin + Send + 'static>,
+ Box<dyn AsyncRead + Unpin + Send + 'static>,
+ )>,
+ >;
+ fn kill(&self);
#[cfg(any(test, feature = "test-support"))]
- Fake(FakeTransport),
-}
-
-impl Transport {
- async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
- #[cfg(any(test, feature = "test-support"))]
- if cfg!(any(test, feature = "test-support")) {
- return FakeTransport::start(cx)
- .await
- .map(|(transports, fake)| (transports, Self::Fake(fake)));
- }
-
- if binary.connection.is_some() {
- TcpTransport::start(binary, cx)
- .await
- .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
- .context("Tried to connect to a debug adapter via TCP transport layer")
- } else {
- StdioTransport::start(binary, cx)
- .await
- .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
- .context("Tried to connect to a debug adapter via stdin/stdout transport layer")
- }
- }
-
- fn has_adapter_logs(&self) -> bool {
- match self {
- Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
- Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
- #[cfg(any(test, feature = "test-support"))]
- Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
- }
+ fn as_fake(&self) -> &FakeTransport {
+ unreachable!()
}
+}
- async fn kill(&self) {
- match self {
- Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
- Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
- #[cfg(any(test, feature = "test-support"))]
- Transport::Fake(fake_transport) => fake_transport.kill().await,
- }
+async fn start(
+ binary: &DebugAdapterBinary,
+ log_handlers: LogHandlers,
+ cx: &mut AsyncApp,
+) -> Result<Box<dyn Transport>> {
+ #[cfg(any(test, feature = "test-support"))]
+ if cfg!(any(test, feature = "test-support")) {
+ return Ok(Box::new(FakeTransport::start(cx).await?));
}
- #[cfg(any(test, feature = "test-support"))]
- pub(crate) fn as_fake(&self) -> &FakeTransport {
- match self {
- Transport::Fake(fake_transport) => fake_transport,
- _ => panic!("Not a fake transport layer"),
- }
+ if binary.connection.is_some() {
+ Ok(Box::new(
+ TcpTransport::start(binary, log_handlers, cx).await?,
+ ))
+ } else {
+ Ok(Box::new(
+ StdioTransport::start(binary, log_handlers, cx).await?,
+ ))
}
}
pub(crate) struct TransportDelegate {
log_handlers: LogHandlers,
- current_requests: Requests,
pending_requests: Requests,
- transport: Transport,
- server_tx: Arc<Mutex<Option<Sender<Message>>>>,
- _tasks: Vec<Task<()>>,
+ pub(crate) transport: Mutex<Box<dyn Transport>>,
+ server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
+ tasks: Mutex<Vec<Task<()>>>,
}
impl TransportDelegate {
- pub(crate) async fn start(
- binary: &DebugAdapterBinary,
- cx: AsyncApp,
- ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
- let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
- let mut this = Self {
- transport,
+ pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
+ let log_handlers: LogHandlers = Default::default();
+ let transport = start(binary, log_handlers.clone(), cx).await?;
+ Ok(Self {
+ transport: Mutex::new(transport),
+ log_handlers,
server_tx: Default::default(),
- log_handlers: Default::default(),
- current_requests: Default::default(),
pending_requests: Default::default(),
- _tasks: Vec::new(),
- };
- let messages = this.start_handlers(transport_pipes, cx).await?;
- Ok((messages, this))
+ tasks: Default::default(),
+ })
}
- async fn start_handlers(
- &mut self,
- mut params: TransportPipe,
- cx: AsyncApp,
- ) -> Result<(Receiver<Message>, Sender<Message>)> {
- let (client_tx, server_rx) = unbounded::<Message>();
+ pub async fn connect(
+ &self,
+ message_handler: DapMessageHandler,
+ cx: &mut AsyncApp,
+ ) -> Result<()> {
let (server_tx, client_rx) = unbounded::<Message>();
+ self.tasks.lock().clear();
let log_dap_communications =
cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
.with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
.unwrap_or(false);
+ let connect = self.transport.lock().connect();
+ let (input, output) = connect.await?;
+
let log_handler = if log_dap_communications {
Some(self.log_handlers.clone())
} else {
None
};
- let adapter_log_handler = log_handler.clone();
- cx.update(|cx| {
- if let Some(stdout) = params.stdout.take() {
- self._tasks.push(cx.background_spawn(async move {
- match Self::handle_adapter_log(stdout, adapter_log_handler).await {
- ConnectionResult::Timeout => {
- log::error!("Timed out when handling debugger log");
- }
- ConnectionResult::ConnectionReset => {
- log::info!("Debugger logs connection closed");
- }
- ConnectionResult::Result(Ok(())) => {}
- ConnectionResult::Result(Err(e)) => {
- log::error!("Error handling debugger log: {e}");
- }
- }
- }));
- }
-
- let pending_requests = self.pending_requests.clone();
- let output_log_handler = log_handler.clone();
- self._tasks.push(cx.background_spawn(async move {
- match Self::handle_output(
- params.output,
- client_tx,
+ let pending_requests = self.pending_requests.clone();
+ let output_log_handler = log_handler.clone();
+ {
+ let mut tasks = self.tasks.lock();
+ tasks.push(cx.background_spawn(async move {
+ match Self::recv_from_server(
+ output,
+ message_handler,
pending_requests.clone(),
output_log_handler,
)
.await
{
- Ok(()) => {}
- Err(e) => log::error!("Error handling debugger output: {e}"),
+ Ok(()) => {
+ pending_requests.lock().drain().for_each(|(_, request)| {
+ request
+ .send(Err(anyhow!("debugger shutdown unexpectedly")))
+ .ok();
+ });
+ }
+ Err(e) => {
+ pending_requests.lock().drain().for_each(|(_, request)| {
+ request.send(Err(e.cloned())).ok();
+ });
+ }
}
- let mut pending_requests = pending_requests.lock().await;
- pending_requests.drain().for_each(|(_, request)| {
- request
- .send(Err(anyhow!("debugger shutdown unexpectedly")))
- .ok();
- });
}));
- if let Some(stderr) = params.stderr.take() {
- let log_handlers = self.log_handlers.clone();
- self._tasks.push(cx.background_spawn(async move {
- match Self::handle_error(stderr, log_handlers).await {
- ConnectionResult::Timeout => {
- log::error!("Timed out reading debugger error stream")
- }
- ConnectionResult::ConnectionReset => {
- log::info!("Debugger closed its error stream")
- }
- ConnectionResult::Result(Ok(())) => {}
- ConnectionResult::Result(Err(e)) => {
- log::error!("Error handling debugger error: {e}")
- }
- }
- }));
- }
-
- let current_requests = self.current_requests.clone();
- let pending_requests = self.pending_requests.clone();
- let log_handler = log_handler.clone();
- self._tasks.push(cx.background_spawn(async move {
- match Self::handle_input(
- params.input,
- client_rx,
- current_requests,
- pending_requests,
- log_handler,
- )
- .await
- {
+ tasks.push(cx.background_spawn(async move {
+ match Self::send_to_server(input, client_rx, log_handler).await {
Ok(()) => {}
Err(e) => log::error!("Error handling debugger input: {e}"),
}
}));
- })?;
+ }
{
let mut lock = self.server_tx.lock().await;
*lock = Some(server_tx.clone());
}
- Ok((server_rx, server_tx))
+ Ok(())
}
- pub(crate) async fn add_pending_request(
+ pub(crate) fn tcp_arguments(&self) -> Option<TcpArguments> {
+ self.transport.lock().tcp_arguments()
+ }
+
+ pub(crate) fn add_pending_request(
&self,
sequence_id: u64,
request: oneshot::Sender<Result<Response>>,
) {
- let mut pending_requests = self.pending_requests.lock().await;
+ let mut pending_requests = self.pending_requests.lock();
pending_requests.insert(sequence_id, request);
}
@@ -272,52 +199,41 @@ impl TransportDelegate {
}
}
- async fn handle_adapter_log<Stdout>(
- stdout: Stdout,
- log_handlers: Option<LogHandlers>,
- ) -> ConnectionResult<()>
- where
- Stdout: AsyncRead + Unpin + Send + 'static,
- {
+ async fn handle_adapter_log(
+ stdout: impl AsyncRead + Unpin + Send + 'static,
+ iokind: IoKind,
+ log_handlers: LogHandlers,
+ ) {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
- let result = loop {
+ loop {
line.truncate(0);
- match reader
- .read_line(&mut line)
- .await
- .context("reading adapter log line")
- {
- Ok(0) => break ConnectionResult::ConnectionReset,
+ match reader.read_line(&mut line).await {
+ Ok(0) => break,
Ok(_) => {}
- Err(e) => break ConnectionResult::Result(Err(e)),
+ Err(e) => {
+ log::debug!("handle_adapter_log: {}", e);
+ break;
+ }
}
- if let Some(log_handlers) = log_handlers.as_ref() {
- for (kind, handler) in log_handlers.lock().iter_mut() {
- if matches!(kind, LogKind::Adapter) {
- handler(IoKind::StdOut, None, line.as_str());
- }
+ for (kind, handler) in log_handlers.lock().iter_mut() {
+ if matches!(kind, LogKind::Adapter) {
+ handler(iokind, None, line.as_str());
}
}
- };
-
- log::debug!("Handle adapter log dropped");
-
- result
+ }
}
fn build_rpc_message(message: String) -> String {
format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
}
- async fn handle_input<Stdin>(
+ async fn send_to_server<Stdin>(
mut server_stdin: Stdin,
client_rx: Receiver<Message>,
- current_requests: Requests,
- pending_requests: Requests,
log_handlers: Option<LogHandlers>,
) -> Result<()>
where
@@ -326,12 +242,6 @@ impl TransportDelegate {
let result = loop {
match client_rx.recv().await {
Ok(message) => {
- if let Message::Request(request) = &message {
- if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
- pending_requests.lock().await.insert(request.seq, sender);
- }
- }
-
let command = match &message {
Message::Request(request) => Some(request.command.as_str()),
Message::Response(response) => Some(response.command.as_str()),
@@ -371,9 +281,9 @@ impl TransportDelegate {
result
}
- async fn handle_output<Stdout>(
+ async fn recv_from_server<Stdout>(
server_stdout: Stdout,
- client_tx: Sender<Message>,
+ mut message_handler: DapMessageHandler,
pending_requests: Requests,
log_handlers: Option<LogHandlers>,
) -> Result<()>
@@ -393,59 +303,25 @@ impl TransportDelegate {
return Ok(());
}
ConnectionResult::Result(Ok(Message::Response(res))) => {
- if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
+ let tx = pending_requests.lock().remove(&res.request_seq);
+ if let Some(tx) = tx {
if let Err(e) = tx.send(Self::process_response(res)) {
log::trace!("Did not send response `{:?}` for a cancelled", e);
}
} else {
- client_tx.send(Message::Response(res)).await?;
+ message_handler(Message::Response(res))
}
}
- ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?,
+ ConnectionResult::Result(Ok(message)) => message_handler(message),
ConnectionResult::Result(Err(e)) => break Err(e),
}
};
- drop(client_tx);
log::debug!("Handle adapter output dropped");
result
}
- async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()>
- where
- Stderr: AsyncRead + Unpin + Send + 'static,
- {
- log::debug!("Handle error started");
- let mut buffer = String::new();
-
- let mut reader = BufReader::new(stderr);
-
- let result = loop {
- match reader
- .read_line(&mut buffer)
- .await
- .context("reading error log line")
- {
- Ok(0) => break ConnectionResult::ConnectionReset,
- Ok(_) => {
- for (kind, log_handler) in log_handlers.lock().iter_mut() {
- if matches!(kind, LogKind::Adapter) {
- log_handler(IoKind::StdErr, None, buffer.as_str());
- }
- }
-
- buffer.truncate(0);
- }
- Err(error) => break ConnectionResult::Result(Err(error)),
- }
- };
-
- log::debug!("Handle adapter error dropped");
-
- result
- }
-
fn process_response(response: Response) -> Result<Response> {
if response.success {
Ok(response)
@@ -479,14 +355,10 @@ impl TransportDelegate {
loop {
buffer.truncate(0);
- match reader
- .read_line(buffer)
- .await
- .with_context(|| "reading a message from server")
- {
+ match reader.read_line(buffer).await {
Ok(0) => return ConnectionResult::ConnectionReset,
Ok(_) => {}
- Err(e) => return ConnectionResult::Result(Err(e)),
+ Err(e) => return ConnectionResult::Result(Err(e.into())),
};
if buffer == "\r\n" {
@@ -547,16 +419,8 @@ impl TransportDelegate {
server_tx.close();
}
- let mut current_requests = self.current_requests.lock().await;
- let mut pending_requests = self.pending_requests.lock().await;
-
- current_requests.clear();
- pending_requests.clear();
-
- self.transport.kill().await;
-
- drop(current_requests);
- drop(pending_requests);
+ self.pending_requests.lock().clear();
+ self.transport.lock().kill();
log::debug!("Shutdown client completed");
@@ -564,11 +428,7 @@ impl TransportDelegate {
}
pub fn has_adapter_logs(&self) -> bool {
- self.transport.has_adapter_logs()
- }
-
- pub fn transport(&self) -> &Transport {
- &self.transport
+ self.transport.lock().has_adapter_logs()
}
pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
@@ -581,10 +441,13 @@ impl TransportDelegate {
}
pub struct TcpTransport {
+ executor: BackgroundExecutor,
pub port: u16,
pub host: Ipv4Addr,
pub timeout: u64,
- process: Option<Mutex<Child>>,
+ process: Arc<Mutex<Option<Child>>>,
+ _stderr_task: Option<Task<()>>,
+ _stdout_task: Option<Task<()>>,
}
impl TcpTransport {
@@ -604,7 +467,11 @@ impl TcpTransport {
.port())
}
- async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
+ async fn start(
+ binary: &DebugAdapterBinary,
+ log_handlers: LogHandlers,
+ cx: &mut AsyncApp,
+ ) -> Result<Self> {
let connection_args = binary
.connection
.as_ref()
@@ -613,7 +480,11 @@ impl TcpTransport {
let host = connection_args.host;
let port = connection_args.port;
- let mut process = if let Some(command) = &binary.command {
+ let mut process = None;
+ let mut stdout_task = None;
+ let mut stderr_task = None;
+
+ if let Some(command) = &binary.command {
let mut command = util::command::new_std_command(&command);
if let Some(cwd) = &binary.cwd {
@@ -623,101 +494,142 @@ impl TcpTransport {
command.args(&binary.arguments);
command.envs(&binary.envs);
- Some(
- Child::spawn(command, Stdio::null())
- .with_context(|| "failed to start debug adapter.")?,
- )
- } else {
- None
+ let mut p = Child::spawn(command, Stdio::null())
+ .with_context(|| "failed to start debug adapter.")?;
+
+ stdout_task = p.stdout.take().map(|stdout| {
+ cx.background_executor()
+ .spawn(TransportDelegate::handle_adapter_log(
+ stdout,
+ IoKind::StdOut,
+ log_handlers.clone(),
+ ))
+ });
+ stderr_task = p.stderr.take().map(|stderr| {
+ cx.background_executor()
+ .spawn(TransportDelegate::handle_adapter_log(
+ stderr,
+ IoKind::StdErr,
+ log_handlers,
+ ))
+ });
+ process = Some(p);
};
- let address = SocketAddrV4::new(host, port);
-
let timeout = connection_args.timeout.unwrap_or_else(|| {
cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
- .unwrap_or(2000u64)
+ .unwrap_or(20000u64)
});
- let (mut process, (rx, tx)) = select! {
- _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
- anyhow::bail!("Connection to TCP DAP timeout {host}:{port}");
- },
- result = cx.spawn(async move |cx| {
- loop {
- match TcpStream::connect(address).await {
- Ok(stream) => return Ok((process, stream.split())),
- Err(_) => {
- if let Some(p) = &mut process {
- if let Ok(Some(_)) = p.try_status() {
- let output = process.take().unwrap().into_inner().output().await?;
- let output = if output.stderr.is_empty() {
- String::from_utf8_lossy(&output.stdout).to_string()
- } else {
- String::from_utf8_lossy(&output.stderr).to_string()
- };
- anyhow::bail!("{output}\nerror: process exited before debugger attached.");
- }
- }
-
- cx.background_executor().timer(Duration::from_millis(100)).await;
- }
- }
- }
- }).fuse() => result?
- };
-
log::info!(
"Debug adapter has connected to TCP server {}:{}",
host,
port
);
- let stdout = process.as_mut().and_then(|p| p.stdout.take());
- let stderr = process.as_mut().and_then(|p| p.stderr.take());
let this = Self {
+ executor: cx.background_executor().clone(),
port,
host,
- process: process.map(Mutex::new),
+ process: Arc::new(Mutex::new(process)),
timeout,
+ _stdout_task: stdout_task,
+ _stderr_task: stderr_task,
};
- let pipe = TransportPipe::new(
- Box::new(tx),
- Box::new(BufReader::new(rx)),
- stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
- stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
- );
-
- Ok((pipe, this))
+ Ok(this)
}
+}
+impl Transport for TcpTransport {
fn has_adapter_logs(&self) -> bool {
true
}
- async fn kill(&self) {
- if let Some(process) = &self.process {
- let mut process = process.lock().await;
- Child::kill(&mut process);
+ fn kill(&self) {
+ if let Some(process) = &mut *self.process.lock() {
+ process.kill();
}
}
+
+ fn tcp_arguments(&self) -> Option<TcpArguments> {
+ Some(TcpArguments {
+ host: self.host,
+ port: self.port,
+ timeout: Some(self.timeout),
+ })
+ }
+
+ fn connect(
+ &mut self,
+ ) -> Task<
+ Result<(
+ Box<dyn AsyncWrite + Unpin + Send + 'static>,
+ Box<dyn AsyncRead + Unpin + Send + 'static>,
+ )>,
+ > {
+ let executor = self.executor.clone();
+ let timeout = self.timeout;
+ let address = SocketAddrV4::new(self.host, self.port);
+ let process = self.process.clone();
+ executor.clone().spawn(async move {
+ select! {
+ _ = executor.timer(Duration::from_millis(timeout)).fuse() => {
+ anyhow::bail!("Connection to TCP DAP timeout {address}");
+ },
+ result = executor.clone().spawn(async move {
+ loop {
+ match TcpStream::connect(address).await {
+ Ok(stream) => {
+ let (read, write) = stream.split();
+ return Ok((Box::new(write) as _, Box::new(read) as _))
+ },
+ Err(_) => {
+ let has_process = process.lock().is_some();
+ if has_process {
+ let status = process.lock().as_mut().unwrap().try_status();
+ if let Ok(Some(_)) = status {
+ let process = process.lock().take().unwrap().into_inner();
+ let output = process.output().await?;
+ let output = if output.stderr.is_empty() {
+ String::from_utf8_lossy(&output.stdout).to_string()
+ } else {
+ String::from_utf8_lossy(&output.stderr).to_string()
+ };
+ anyhow::bail!("{output}\nerror: process exited before debugger attached.");
+ }
+ }
+
+ executor.timer(Duration::from_millis(100)).await;
+ }
+ }
+ }
+ }).fuse() => result
+ }
+ })
+ }
}
impl Drop for TcpTransport {
fn drop(&mut self) {
- if let Some(mut p) = self.process.take() {
- p.get_mut().kill();
+ if let Some(mut p) = self.process.lock().take() {
+ p.kill();
}
}
}
pub struct StdioTransport {
process: Mutex<Child>,
+ _stderr_task: Option<Task<()>>,
}
impl StdioTransport {
- #[allow(dead_code, reason = "This is used in non test builds of Zed")]
- async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
+ // #[allow(dead_code, reason = "This is used in non test builds of Zed")]
+ async fn start(
+ binary: &DebugAdapterBinary,
+ log_handlers: LogHandlers,
+ cx: &mut AsyncApp,
+ ) -> Result<Self> {
let Some(binary_command) = &binary.command else {
bail!(
"When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
@@ -740,42 +652,52 @@ impl StdioTransport {
)
})?;
- let stdin = process.stdin.take().context("Failed to open stdin")?;
- let stdout = process.stdout.take().context("Failed to open stdout")?;
- let stderr = process
- .stderr
- .take()
- .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
-
- if stderr.is_none() {
- bail!(
- "Failed to connect to stderr for debug adapter command {}",
- &binary_command
- );
- }
-
- log::info!("Debug adapter has connected to stdio adapter");
+ let err_task = process.stderr.take().map(|stderr| {
+ cx.background_spawn(TransportDelegate::handle_adapter_log(
+ stderr,
+ IoKind::StdErr,
+ log_handlers,
+ ))
+ });
let process = Mutex::new(process);
- Ok((
- TransportPipe::new(
- Box::new(stdin),
- Box::new(BufReader::new(stdout)),
- None,
- stderr,
- ),
- Self { process },
- ))
+ Ok(Self {
+ process,
+ _stderr_task: err_task,
+ })
}
+}
+impl Transport for StdioTransport {
fn has_adapter_logs(&self) -> bool {
false
}
- async fn kill(&self) {
- let mut process = self.process.lock().await;
- Child::kill(&mut process);
+ fn kill(&self) {
+ self.process.lock().kill()
+ }
+
+ fn connect(
+ &mut self,
+ ) -> Task<
+ Result<(
+ Box<dyn AsyncWrite + Unpin + Send + 'static>,
+ Box<dyn AsyncRead + Unpin + Send + 'static>,
+ )>,
+ > {
+ let mut process = self.process.lock();
+ let result = util::maybe!({
+ Ok((
+ Box::new(process.stdin.take().context("Cannot reconnect")?) as _,
+ Box::new(process.stdout.take().context("Cannot reconnect")?) as _,
+ ))
+ });
+ Task::ready(result)
+ }
+
+ fn tcp_arguments(&self) -> Option<TcpArguments> {
+ None
}
}
@@ -795,9 +717,12 @@ type ResponseHandler = Box<dyn Send + Fn(Response)>;
#[cfg(any(test, feature = "test-support"))]
pub struct FakeTransport {
// for sending fake response back from adapter side
- request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
+ request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
// for reverse request responses
- response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
+ response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
+
+ stdin_writer: Option<PipeWriter>,
+ stdout_reader: Option<PipeReader>,
}
#[cfg(any(test, feature = "test-support"))]
@@ -833,7 +758,7 @@ impl FakeTransport {
);
}
- pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
+ pub fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
where
F: 'static + Send + Fn(Response),
{
@@ -842,20 +767,23 @@ impl FakeTransport {
.insert(R::COMMAND, Box::new(handler));
}
- async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
- let this = Self {
- request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
- response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
- };
+ async fn start(cx: &mut AsyncApp) -> Result<Self> {
use dap_types::requests::{Request, RunInTerminal, StartDebugging};
use serde_json::json;
let (stdin_writer, stdin_reader) = async_pipe::pipe();
let (stdout_writer, stdout_reader) = async_pipe::pipe();
+ let this = Self {
+ request_handlers: Arc::new(Mutex::new(HashMap::default())),
+ response_handlers: Arc::new(Mutex::new(HashMap::default())),
+ stdin_writer: Some(stdin_writer),
+ stdout_reader: Some(stdout_reader),
+ };
+
let request_handlers = this.request_handlers.clone();
let response_handlers = this.response_handlers.clone();
- let stdout_writer = Arc::new(Mutex::new(stdout_writer));
+ let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer));
cx.background_spawn(async move {
let mut reader = BufReader::new(stdin_reader);
@@ -945,17 +873,43 @@ impl FakeTransport {
})
.detach();
- Ok((
- TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
- this,
- ))
+ Ok(this)
+ }
+}
+
+#[cfg(any(test, feature = "test-support"))]
+impl Transport for FakeTransport {
+ fn tcp_arguments(&self) -> Option<TcpArguments> {
+ None
+ }
+
+ fn connect(
+ &mut self,
+ ) -> Task<
+ Result<(
+ Box<dyn AsyncWrite + Unpin + Send + 'static>,
+ Box<dyn AsyncRead + Unpin + Send + 'static>,
+ )>,
+ > {
+ let result = util::maybe!({
+ Ok((
+ Box::new(self.stdin_writer.take().context("Cannot reconnect")?) as _,
+ Box::new(self.stdout_reader.take().context("Cannot reconnect")?) as _,
+ ))
+ });
+ Task::ready(result)
}
fn has_adapter_logs(&self) -> bool {
false
}
- async fn kill(&self) {}
+ fn kill(&self) {}
+
+ #[cfg(any(test, feature = "test-support"))]
+ fn as_fake(&self) -> &FakeTransport {
+ self
+ }
}
struct Child {
@@ -29,6 +29,7 @@ use dap::{
StartDebuggingRequestArgumentsRequest,
};
use futures::SinkExt;
+use futures::channel::mpsc::UnboundedSender;
use futures::channel::{mpsc, oneshot};
use futures::{FutureExt, future::Shared};
use gpui::{
@@ -139,6 +140,7 @@ pub struct RunningMode {
executor: BackgroundExecutor,
is_started: bool,
has_ever_stopped: bool,
+ messages_tx: UnboundedSender<Message>,
}
fn client_source(abs_path: &Path) -> dap::Source {
@@ -163,34 +165,35 @@ impl RunningMode {
worktree: WeakEntity<Worktree>,
binary: DebugAdapterBinary,
messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
- cx: AsyncApp,
+ cx: &mut AsyncApp,
) -> Result<Self> {
- let message_handler = Box::new(move |message| {
- messages_tx.unbounded_send(message).ok();
+ let message_handler = Box::new({
+ let messages_tx = messages_tx.clone();
+ move |message| {
+ messages_tx.unbounded_send(message).ok();
+ }
});
- let client = Arc::new(
- if let Some(client) = parent_session
- .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
- .flatten()
- {
- client
- .reconnect(session_id, binary.clone(), message_handler, cx.clone())
- .await?
- } else {
- DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
- .await?
- },
- );
+ let client = if let Some(client) = parent_session
+ .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
+ .flatten()
+ {
+ client
+ .create_child_connection(session_id, binary.clone(), message_handler, cx)
+ .await?
+ } else {
+ DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
+ };
Ok(Self {
- client,
+ client: Arc::new(client),
worktree,
tmp_breakpoint: None,
binary,
executor: cx.background_executor().clone(),
is_started: false,
has_ever_stopped: false,
+ messages_tx,
})
}
@@ -481,6 +484,22 @@ impl RunningMode {
})
}
+ fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
+ let client = self.client.clone();
+ let messages_tx = self.messages_tx.clone();
+ let message_handler = Box::new(move |message| {
+ messages_tx.unbounded_send(message).ok();
+ });
+ if client.should_reconnect_for_ssh() {
+ Some(cx.spawn(async move |cx| {
+ client.connect(message_handler, cx).await?;
+ anyhow::Ok(())
+ }))
+ } else {
+ None
+ }
+ }
+
fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
where
<R::DapRequest as dap::requests::Request>::Response: 'static,
@@ -855,7 +874,7 @@ impl Session {
worktree.downgrade(),
binary.clone(),
message_tx,
- cx.clone(),
+ cx,
)
.await?;
this.update(cx, |this, cx| {
@@ -1131,35 +1150,58 @@ impl Session {
pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
let adapter_id = self.adapter().to_string();
let request = Initialize { adapter_id };
- match &self.mode {
- Mode::Running(local_mode) => {
- let capabilities = local_mode.request(request);
- cx.spawn(async move |this, cx| {
- let capabilities = capabilities.await?;
- this.update(cx, |session, cx| {
- session.capabilities = capabilities;
- let filters = session
- .capabilities
- .exception_breakpoint_filters
- .clone()
- .unwrap_or_default();
- for filter in filters {
- let default = filter.default.unwrap_or_default();
- session
- .exception_breakpoints
- .entry(filter.filter.clone())
- .or_insert_with(|| (filter, default));
- }
- cx.emit(SessionEvent::CapabilitiesLoaded);
- })?;
- Ok(())
- })
- }
- Mode::Building => Task::ready(Err(anyhow!(
+ let Mode::Running(running) = &self.mode else {
+ return Task::ready(Err(anyhow!(
"Cannot send initialize request, task still building"
- ))),
- }
+ )));
+ };
+ let mut response = running.request(request.clone());
+
+ cx.spawn(async move |this, cx| {
+ loop {
+ let capabilities = response.await;
+ match capabilities {
+ Err(e) => {
+ let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
+ this.as_running()
+ .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
+ }) else {
+ return Err(e);
+ };
+ log::info!("Failed to connect to debug adapter: {}, retrying...", e);
+ reconnect.await?;
+
+ let Ok(Some(r)) = this.update(cx, |this, _| {
+ this.as_running()
+ .map(|running| running.request(request.clone()))
+ }) else {
+ return Err(e);
+ };
+ response = r
+ }
+ Ok(capabilities) => {
+ this.update(cx, |session, cx| {
+ session.capabilities = capabilities;
+ let filters = session
+ .capabilities
+ .exception_breakpoint_filters
+ .clone()
+ .unwrap_or_default();
+ for filter in filters {
+ let default = filter.default.unwrap_or_default();
+ session
+ .exception_breakpoints
+ .entry(filter.filter.clone())
+ .or_insert_with(|| (filter, default));
+ }
+ cx.emit(SessionEvent::CapabilitiesLoaded);
+ })?;
+ return Ok(());
+ }
+ }
+ }
+ })
}
pub(super) fn initialize_sequence(