1use anyhow::{Context as _, Result, anyhow, bail};
2#[cfg(any(test, feature = "test-support"))]
3use async_pipe::{PipeReader, PipeWriter};
4use dap_types::{
5 ErrorResponse,
6 messages::{Message, Response},
7};
8use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
9use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task};
10use parking_lot::Mutex;
11use proto::ErrorExt;
12use settings::Settings as _;
13use smallvec::SmallVec;
14use smol::{
15 channel::{Receiver, Sender, unbounded},
16 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
17 net::{TcpListener, TcpStream},
18};
19use std::{
20 collections::HashMap,
21 net::{Ipv4Addr, SocketAddrV4},
22 process::Stdio,
23 sync::Arc,
24 time::Duration,
25};
26use task::TcpArgumentsTemplate;
27use util::ConnectionResult;
28
29use crate::{
30 adapters::{DebugAdapterBinary, TcpArguments},
31 client::DapMessageHandler,
32 debugger_settings::DebuggerSettings,
33};
34
35pub(crate) type IoMessage = str;
36pub(crate) type Command = str;
37pub type IoHandler = Box<dyn Send + FnMut(IoKind, Option<&Command>, &IoMessage)>;
38
39#[derive(PartialEq, Eq, Clone, Copy)]
40pub enum LogKind {
41 Adapter,
42 Rpc,
43}
44
45#[derive(Clone, Copy)]
46pub enum IoKind {
47 StdIn,
48 StdOut,
49 StdErr,
50}
51
52type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
53
54pub trait Transport: Send + Sync {
55 fn has_adapter_logs(&self) -> bool;
56 fn tcp_arguments(&self) -> Option<TcpArguments>;
57 fn connect(
58 &mut self,
59 ) -> Task<
60 Result<(
61 Box<dyn AsyncWrite + Unpin + Send + 'static>,
62 Box<dyn AsyncRead + Unpin + Send + 'static>,
63 )>,
64 >;
65 fn kill(&mut self);
66 #[cfg(any(test, feature = "test-support"))]
67 fn as_fake(&self) -> &FakeTransport {
68 unreachable!()
69 }
70}
71
72async fn start(
73 binary: &DebugAdapterBinary,
74 log_handlers: LogHandlers,
75 cx: &mut AsyncApp,
76) -> Result<Box<dyn Transport>> {
77 #[cfg(any(test, feature = "test-support"))]
78 if cfg!(any(test, feature = "test-support")) {
79 return Ok(Box::new(FakeTransport::start(cx).await?));
80 }
81
82 if binary.connection.is_some() {
83 Ok(Box::new(
84 TcpTransport::start(binary, log_handlers, cx).await?,
85 ))
86 } else {
87 Ok(Box::new(
88 StdioTransport::start(binary, log_handlers, cx).await?,
89 ))
90 }
91}
92
93pub(crate) struct TransportDelegate {
94 log_handlers: LogHandlers,
95 // TODO this should really be some kind of associative channel
96 pub(crate) pending_requests:
97 Arc<Mutex<Option<HashMap<u64, oneshot::Sender<Result<Response>>>>>>,
98 pub(crate) transport: Mutex<Box<dyn Transport>>,
99 pub(crate) server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
100 tasks: Mutex<Vec<Task<()>>>,
101}
102
103impl TransportDelegate {
104 pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
105 let log_handlers: LogHandlers = Default::default();
106 let transport = start(binary, log_handlers.clone(), cx).await?;
107 Ok(Self {
108 transport: Mutex::new(transport),
109 log_handlers,
110 server_tx: Default::default(),
111 pending_requests: Arc::new(Mutex::new(Some(HashMap::default()))),
112 tasks: Default::default(),
113 })
114 }
115
116 pub async fn connect(
117 &self,
118 message_handler: DapMessageHandler,
119 cx: &mut AsyncApp,
120 ) -> Result<()> {
121 let (server_tx, client_rx) = unbounded::<Message>();
122 self.tasks.lock().clear();
123
124 let log_dap_communications =
125 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
126 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
127 .unwrap_or(false);
128
129 let connect = self.transport.lock().connect();
130 let (input, output) = connect.await?;
131
132 let log_handler = if log_dap_communications {
133 Some(self.log_handlers.clone())
134 } else {
135 None
136 };
137
138 let pending_requests = self.pending_requests.clone();
139 let output_log_handler = log_handler.clone();
140 {
141 let mut tasks = self.tasks.lock();
142 tasks.push(cx.background_spawn(async move {
143 match Self::recv_from_server(
144 output,
145 message_handler,
146 pending_requests.clone(),
147 output_log_handler,
148 )
149 .await
150 {
151 Ok(()) => {
152 pending_requests
153 .lock()
154 .take()
155 .into_iter()
156 .flatten()
157 .for_each(|(_, request)| {
158 request
159 .send(Err(anyhow!("debugger shutdown unexpectedly")))
160 .ok();
161 });
162 }
163 Err(e) => {
164 pending_requests
165 .lock()
166 .take()
167 .into_iter()
168 .flatten()
169 .for_each(|(_, request)| {
170 request.send(Err(e.cloned())).ok();
171 });
172 }
173 }
174 }));
175
176 tasks.push(cx.background_spawn(async move {
177 match Self::send_to_server(input, client_rx, log_handler).await {
178 Ok(()) => {}
179 Err(e) => log::error!("Error handling debugger input: {e}"),
180 }
181 }));
182 }
183
184 {
185 let mut lock = self.server_tx.lock().await;
186 *lock = Some(server_tx.clone());
187 }
188
189 Ok(())
190 }
191
192 pub(crate) fn tcp_arguments(&self) -> Option<TcpArguments> {
193 self.transport.lock().tcp_arguments()
194 }
195
196 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
197 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
198 server_tx.send(message).await.context("sending message")
199 } else {
200 anyhow::bail!("Server tx already dropped")
201 }
202 }
203
204 async fn handle_adapter_log(
205 stdout: impl AsyncRead + Unpin + Send + 'static,
206 iokind: IoKind,
207 log_handlers: LogHandlers,
208 ) {
209 let mut reader = BufReader::new(stdout);
210 let mut line = String::new();
211
212 loop {
213 line.truncate(0);
214
215 match reader.read_line(&mut line).await {
216 Ok(0) => break,
217 Ok(_) => {}
218 Err(e) => {
219 log::debug!("handle_adapter_log: {}", e);
220 break;
221 }
222 }
223
224 for (kind, handler) in log_handlers.lock().iter_mut() {
225 if matches!(kind, LogKind::Adapter) {
226 handler(iokind, None, line.as_str());
227 }
228 }
229 }
230 }
231
232 fn build_rpc_message(message: String) -> String {
233 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
234 }
235
236 async fn send_to_server<Stdin>(
237 mut server_stdin: Stdin,
238 client_rx: Receiver<Message>,
239 log_handlers: Option<LogHandlers>,
240 ) -> Result<()>
241 where
242 Stdin: AsyncWrite + Unpin + Send + 'static,
243 {
244 let result = loop {
245 match client_rx.recv().await {
246 Ok(message) => {
247 let command = match &message {
248 Message::Request(request) => Some(request.command.as_str()),
249 Message::Response(response) => Some(response.command.as_str()),
250 _ => None,
251 };
252
253 let message = match serde_json::to_string(&message) {
254 Ok(message) => message,
255 Err(e) => break Err(e.into()),
256 };
257
258 if let Some(log_handlers) = log_handlers.as_ref() {
259 for (kind, log_handler) in log_handlers.lock().iter_mut() {
260 if matches!(kind, LogKind::Rpc) {
261 log_handler(IoKind::StdIn, command, &message);
262 }
263 }
264 }
265
266 if let Err(e) = server_stdin
267 .write_all(Self::build_rpc_message(message).as_bytes())
268 .await
269 {
270 break Err(e.into());
271 }
272
273 if let Err(e) = server_stdin.flush().await {
274 break Err(e.into());
275 }
276 }
277 Err(error) => break Err(error.into()),
278 }
279 };
280
281 log::debug!("Handle adapter input dropped");
282
283 result
284 }
285
286 async fn recv_from_server<Stdout>(
287 server_stdout: Stdout,
288 mut message_handler: DapMessageHandler,
289 pending_requests: Arc<Mutex<Option<HashMap<u64, oneshot::Sender<Result<Response>>>>>>,
290 log_handlers: Option<LogHandlers>,
291 ) -> Result<()>
292 where
293 Stdout: AsyncRead + Unpin + Send + 'static,
294 {
295 let mut recv_buffer = String::new();
296 let mut reader = BufReader::new(server_stdout);
297
298 let result = loop {
299 let result =
300 Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
301 .await;
302 match result {
303 ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
304 ConnectionResult::ConnectionReset => {
305 log::info!("Debugger closed the connection");
306 break Ok(());
307 }
308 ConnectionResult::Result(Ok(Message::Response(res))) => {
309 let tx = pending_requests
310 .lock()
311 .as_mut()
312 .context("client is closed")?
313 .remove(&res.request_seq);
314 if let Some(tx) = tx {
315 if let Err(e) = tx.send(Self::process_response(res)) {
316 log::trace!("Did not send response `{:?}` for a cancelled", e);
317 }
318 } else {
319 message_handler(Message::Response(res))
320 }
321 }
322 ConnectionResult::Result(Ok(message)) => message_handler(message),
323 ConnectionResult::Result(Err(e)) => break Err(e),
324 }
325 };
326
327 log::debug!("Handle adapter output dropped");
328
329 result
330 }
331
332 fn process_response(response: Response) -> Result<Response> {
333 if response.success {
334 Ok(response)
335 } else {
336 if let Some(error_message) = response
337 .body
338 .clone()
339 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
340 .and_then(|response| response.error.map(|msg| msg.format))
341 .or_else(|| response.message.clone())
342 {
343 anyhow::bail!(error_message);
344 };
345
346 anyhow::bail!(
347 "Received error response from adapter. Response: {:?}",
348 response
349 );
350 }
351 }
352
353 async fn receive_server_message<Stdout>(
354 reader: &mut BufReader<Stdout>,
355 buffer: &mut String,
356 log_handlers: Option<&LogHandlers>,
357 ) -> ConnectionResult<Message>
358 where
359 Stdout: AsyncRead + Unpin + Send + 'static,
360 {
361 let mut content_length = None;
362 loop {
363 buffer.truncate(0);
364 match reader.read_line(buffer).await {
365 Ok(0) => return ConnectionResult::ConnectionReset,
366 Ok(_) => {}
367 Err(e) => return ConnectionResult::Result(Err(e.into())),
368 };
369
370 if buffer == "\r\n" {
371 break;
372 }
373
374 if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
375 match value.parse().context("invalid content length") {
376 Ok(length) => content_length = Some(length),
377 Err(e) => return ConnectionResult::Result(Err(e)),
378 }
379 }
380 }
381
382 let content_length = match content_length.context("missing content length") {
383 Ok(length) => length,
384 Err(e) => return ConnectionResult::Result(Err(e)),
385 };
386
387 let mut content = vec![0; content_length];
388 if let Err(e) = reader
389 .read_exact(&mut content)
390 .await
391 .with_context(|| "reading after a loop")
392 {
393 return ConnectionResult::Result(Err(e));
394 }
395
396 let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
397 Ok(str) => str,
398 Err(e) => return ConnectionResult::Result(Err(e)),
399 };
400
401 let message =
402 serde_json::from_str::<Message>(message_str).context("deserializing server message");
403
404 if let Some(log_handlers) = log_handlers {
405 let command = match &message {
406 Ok(Message::Request(request)) => Some(request.command.as_str()),
407 Ok(Message::Response(response)) => Some(response.command.as_str()),
408 _ => None,
409 };
410
411 for (kind, log_handler) in log_handlers.lock().iter_mut() {
412 if matches!(kind, LogKind::Rpc) {
413 log_handler(IoKind::StdOut, command, message_str);
414 }
415 }
416 }
417
418 ConnectionResult::Result(message)
419 }
420
421 pub fn has_adapter_logs(&self) -> bool {
422 self.transport.lock().has_adapter_logs()
423 }
424
425 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
426 where
427 F: 'static + Send + FnMut(IoKind, Option<&Command>, &IoMessage),
428 {
429 let mut log_handlers = self.log_handlers.lock();
430 log_handlers.push((kind, Box::new(f)));
431 }
432}
433
434pub struct TcpTransport {
435 executor: BackgroundExecutor,
436 pub port: u16,
437 pub host: Ipv4Addr,
438 pub timeout: u64,
439 process: Arc<Mutex<Option<Child>>>,
440 _stderr_task: Option<Task<()>>,
441 _stdout_task: Option<Task<()>>,
442}
443
444impl TcpTransport {
445 /// Get an open port to use with the tcp client when not supplied by debug config
446 pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
447 if let Some(port) = host.port {
448 Ok(port)
449 } else {
450 Self::unused_port(host.host()).await
451 }
452 }
453
454 pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
455 Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
456 .await?
457 .local_addr()?
458 .port())
459 }
460
461 async fn start(
462 binary: &DebugAdapterBinary,
463 log_handlers: LogHandlers,
464 cx: &mut AsyncApp,
465 ) -> Result<Self> {
466 let connection_args = binary
467 .connection
468 .as_ref()
469 .context("No connection arguments provided")?;
470
471 let host = connection_args.host;
472 let port = connection_args.port;
473
474 let mut process = None;
475 let mut stdout_task = None;
476 let mut stderr_task = None;
477
478 if let Some(command) = &binary.command {
479 let mut command = util::command::new_std_command(&command);
480
481 if let Some(cwd) = &binary.cwd {
482 command.current_dir(cwd);
483 }
484
485 command.args(&binary.arguments);
486 command.envs(&binary.envs);
487
488 let mut p = Child::spawn(command, Stdio::null())
489 .with_context(|| "failed to start debug adapter.")?;
490
491 stdout_task = p.stdout.take().map(|stdout| {
492 cx.background_executor()
493 .spawn(TransportDelegate::handle_adapter_log(
494 stdout,
495 IoKind::StdOut,
496 log_handlers.clone(),
497 ))
498 });
499 stderr_task = p.stderr.take().map(|stderr| {
500 cx.background_executor()
501 .spawn(TransportDelegate::handle_adapter_log(
502 stderr,
503 IoKind::StdErr,
504 log_handlers,
505 ))
506 });
507 process = Some(p);
508 };
509
510 let timeout = connection_args.timeout.unwrap_or_else(|| {
511 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
512 .unwrap_or(20000u64)
513 });
514
515 log::info!(
516 "Debug adapter has connected to TCP server {}:{}",
517 host,
518 port
519 );
520
521 let this = Self {
522 executor: cx.background_executor().clone(),
523 port,
524 host,
525 process: Arc::new(Mutex::new(process)),
526 timeout,
527 _stdout_task: stdout_task,
528 _stderr_task: stderr_task,
529 };
530
531 Ok(this)
532 }
533}
534
535impl Transport for TcpTransport {
536 fn has_adapter_logs(&self) -> bool {
537 true
538 }
539
540 fn kill(&mut self) {
541 if let Some(process) = &mut *self.process.lock() {
542 process.kill();
543 }
544 }
545
546 fn tcp_arguments(&self) -> Option<TcpArguments> {
547 Some(TcpArguments {
548 host: self.host,
549 port: self.port,
550 timeout: Some(self.timeout),
551 })
552 }
553
554 fn connect(
555 &mut self,
556 ) -> Task<
557 Result<(
558 Box<dyn AsyncWrite + Unpin + Send + 'static>,
559 Box<dyn AsyncRead + Unpin + Send + 'static>,
560 )>,
561 > {
562 let executor = self.executor.clone();
563 let timeout = self.timeout;
564 let address = SocketAddrV4::new(self.host, self.port);
565 let process = self.process.clone();
566 executor.clone().spawn(async move {
567 select! {
568 _ = executor.timer(Duration::from_millis(timeout)).fuse() => {
569 anyhow::bail!("Connection to TCP DAP timeout {address}");
570 },
571 result = executor.clone().spawn(async move {
572 loop {
573 match TcpStream::connect(address).await {
574 Ok(stream) => {
575 let (read, write) = stream.split();
576 return Ok((Box::new(write) as _, Box::new(read) as _))
577 },
578 Err(_) => {
579 let has_process = process.lock().is_some();
580 if has_process {
581 let status = process.lock().as_mut().unwrap().try_status();
582 if let Ok(Some(_)) = status {
583 let process = process.lock().take().unwrap().into_inner();
584 let output = process.output().await?;
585 let output = if output.stderr.is_empty() {
586 String::from_utf8_lossy(&output.stdout).to_string()
587 } else {
588 String::from_utf8_lossy(&output.stderr).to_string()
589 };
590 anyhow::bail!("{output}\nerror: process exited before debugger attached.");
591 }
592 }
593
594 executor.timer(Duration::from_millis(100)).await;
595 }
596 }
597 }
598 }).fuse() => result
599 }
600 })
601 }
602}
603
604impl Drop for TcpTransport {
605 fn drop(&mut self) {
606 if let Some(mut p) = self.process.lock().take() {
607 p.kill()
608 }
609 }
610}
611
612pub struct StdioTransport {
613 process: Mutex<Option<Child>>,
614 _stderr_task: Option<Task<()>>,
615}
616
617impl StdioTransport {
618 // #[allow(dead_code, reason = "This is used in non test builds of Zed")]
619 async fn start(
620 binary: &DebugAdapterBinary,
621 log_handlers: LogHandlers,
622 cx: &mut AsyncApp,
623 ) -> Result<Self> {
624 let Some(binary_command) = &binary.command else {
625 bail!(
626 "When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
627 );
628 };
629 let mut command = util::command::new_std_command(&binary_command);
630
631 if let Some(cwd) = &binary.cwd {
632 command.current_dir(cwd);
633 }
634
635 command.args(&binary.arguments);
636 command.envs(&binary.envs);
637
638 let mut process = Child::spawn(command, Stdio::piped()).with_context(|| {
639 format!(
640 "failed to spawn command `{} {}`.",
641 binary_command,
642 binary.arguments.join(" ")
643 )
644 })?;
645
646 let err_task = process.stderr.take().map(|stderr| {
647 cx.background_spawn(TransportDelegate::handle_adapter_log(
648 stderr,
649 IoKind::StdErr,
650 log_handlers,
651 ))
652 });
653
654 let process = Mutex::new(Some(process));
655
656 Ok(Self {
657 process,
658 _stderr_task: err_task,
659 })
660 }
661}
662
663impl Transport for StdioTransport {
664 fn has_adapter_logs(&self) -> bool {
665 false
666 }
667
668 fn kill(&mut self) {
669 if let Some(process) = &mut *self.process.lock() {
670 process.kill();
671 }
672 }
673
674 fn connect(
675 &mut self,
676 ) -> Task<
677 Result<(
678 Box<dyn AsyncWrite + Unpin + Send + 'static>,
679 Box<dyn AsyncRead + Unpin + Send + 'static>,
680 )>,
681 > {
682 let result = util::maybe!({
683 let mut guard = self.process.lock();
684 let process = guard.as_mut().context("oops")?;
685 Ok((
686 Box::new(process.stdin.take().context("Cannot reconnect")?) as _,
687 Box::new(process.stdout.take().context("Cannot reconnect")?) as _,
688 ))
689 });
690 Task::ready(result)
691 }
692
693 fn tcp_arguments(&self) -> Option<TcpArguments> {
694 None
695 }
696}
697
698impl Drop for StdioTransport {
699 fn drop(&mut self) {
700 if let Some(process) = &mut *self.process.lock() {
701 process.kill();
702 }
703 }
704}
705
706#[cfg(any(test, feature = "test-support"))]
707type RequestHandler =
708 Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
709
710#[cfg(any(test, feature = "test-support"))]
711type ResponseHandler = Box<dyn Send + Fn(Response)>;
712
713#[cfg(any(test, feature = "test-support"))]
714pub struct FakeTransport {
715 // for sending fake response back from adapter side
716 request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
717 // for reverse request responses
718 response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
719
720 stdin_writer: Option<PipeWriter>,
721 stdout_reader: Option<PipeReader>,
722 message_handler: Option<Task<Result<()>>>,
723}
724
725#[cfg(any(test, feature = "test-support"))]
726impl FakeTransport {
727 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
728 where
729 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
730 {
731 self.request_handlers.lock().insert(
732 R::COMMAND,
733 Box::new(move |seq, args| {
734 let result = handler(seq, serde_json::from_value(args).unwrap());
735 let response = match result {
736 Ok(response) => Response {
737 seq: seq + 1,
738 request_seq: seq,
739 success: true,
740 command: R::COMMAND.into(),
741 body: Some(serde_json::to_value(response).unwrap()),
742 message: None,
743 },
744 Err(response) => Response {
745 seq: seq + 1,
746 request_seq: seq,
747 success: false,
748 command: R::COMMAND.into(),
749 body: Some(serde_json::to_value(response).unwrap()),
750 message: None,
751 },
752 };
753 response
754 }),
755 );
756 }
757
758 pub fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
759 where
760 F: 'static + Send + Fn(Response),
761 {
762 self.response_handlers
763 .lock()
764 .insert(R::COMMAND, Box::new(handler));
765 }
766
767 async fn start(cx: &mut AsyncApp) -> Result<Self> {
768 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
769 use serde_json::json;
770
771 let (stdin_writer, stdin_reader) = async_pipe::pipe();
772 let (stdout_writer, stdout_reader) = async_pipe::pipe();
773
774 let mut this = Self {
775 request_handlers: Arc::new(Mutex::new(HashMap::default())),
776 response_handlers: Arc::new(Mutex::new(HashMap::default())),
777 stdin_writer: Some(stdin_writer),
778 stdout_reader: Some(stdout_reader),
779 message_handler: None,
780 };
781
782 let request_handlers = this.request_handlers.clone();
783 let response_handlers = this.response_handlers.clone();
784 let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer));
785
786 this.message_handler = Some(cx.background_spawn(async move {
787 let mut reader = BufReader::new(stdin_reader);
788 let mut buffer = String::new();
789
790 loop {
791 match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
792 .await
793 {
794 ConnectionResult::Timeout => {
795 anyhow::bail!("Timed out when connecting to debugger");
796 }
797 ConnectionResult::ConnectionReset => {
798 log::info!("Debugger closed the connection");
799 break Ok(());
800 }
801 ConnectionResult::Result(Err(e)) => break Err(e),
802 ConnectionResult::Result(Ok(message)) => {
803 match message {
804 Message::Request(request) => {
805 // redirect reverse requests to stdout writer/reader
806 if request.command == RunInTerminal::COMMAND
807 || request.command == StartDebugging::COMMAND
808 {
809 let message =
810 serde_json::to_string(&Message::Request(request)).unwrap();
811
812 let mut writer = stdout_writer.lock().await;
813 writer
814 .write_all(
815 TransportDelegate::build_rpc_message(message)
816 .as_bytes(),
817 )
818 .await
819 .unwrap();
820 writer.flush().await.unwrap();
821 } else {
822 let response = if let Some(handle) =
823 request_handlers.lock().get_mut(request.command.as_str())
824 {
825 handle(request.seq, request.arguments.unwrap_or(json!({})))
826 } else {
827 panic!("No request handler for {}", request.command);
828 };
829 let message =
830 serde_json::to_string(&Message::Response(response))
831 .unwrap();
832
833 let mut writer = stdout_writer.lock().await;
834 writer
835 .write_all(
836 TransportDelegate::build_rpc_message(message)
837 .as_bytes(),
838 )
839 .await
840 .unwrap();
841 writer.flush().await.unwrap();
842 }
843 }
844 Message::Event(event) => {
845 let message =
846 serde_json::to_string(&Message::Event(event)).unwrap();
847
848 let mut writer = stdout_writer.lock().await;
849 writer
850 .write_all(
851 TransportDelegate::build_rpc_message(message).as_bytes(),
852 )
853 .await
854 .unwrap();
855 writer.flush().await.unwrap();
856 }
857 Message::Response(response) => {
858 if let Some(handle) =
859 response_handlers.lock().get(response.command.as_str())
860 {
861 handle(response);
862 } else {
863 log::error!("No response handler for {}", response.command);
864 }
865 }
866 }
867 }
868 }
869 }
870 }));
871
872 Ok(this)
873 }
874}
875
876#[cfg(any(test, feature = "test-support"))]
877impl Transport for FakeTransport {
878 fn tcp_arguments(&self) -> Option<TcpArguments> {
879 None
880 }
881
882 fn connect(
883 &mut self,
884 ) -> Task<
885 Result<(
886 Box<dyn AsyncWrite + Unpin + Send + 'static>,
887 Box<dyn AsyncRead + Unpin + Send + 'static>,
888 )>,
889 > {
890 let result = util::maybe!({
891 Ok((
892 Box::new(self.stdin_writer.take().context("Cannot reconnect")?) as _,
893 Box::new(self.stdout_reader.take().context("Cannot reconnect")?) as _,
894 ))
895 });
896 Task::ready(result)
897 }
898
899 fn has_adapter_logs(&self) -> bool {
900 false
901 }
902
903 fn kill(&mut self) {
904 self.message_handler.take();
905 }
906
907 #[cfg(any(test, feature = "test-support"))]
908 fn as_fake(&self) -> &FakeTransport {
909 self
910 }
911}
912
913struct Child {
914 process: smol::process::Child,
915}
916
917impl std::ops::Deref for Child {
918 type Target = smol::process::Child;
919
920 fn deref(&self) -> &Self::Target {
921 &self.process
922 }
923}
924
925impl std::ops::DerefMut for Child {
926 fn deref_mut(&mut self) -> &mut Self::Target {
927 &mut self.process
928 }
929}
930
931impl Child {
932 fn into_inner(self) -> smol::process::Child {
933 self.process
934 }
935
936 #[cfg(not(windows))]
937 fn spawn(mut command: std::process::Command, stdin: Stdio) -> Result<Self> {
938 util::set_pre_exec_to_start_new_session(&mut command);
939 let process = smol::process::Command::from(command)
940 .stdin(stdin)
941 .stdout(Stdio::piped())
942 .stderr(Stdio::piped())
943 .spawn()?;
944 Ok(Self { process })
945 }
946
947 #[cfg(windows)]
948 fn spawn(command: std::process::Command, stdin: Stdio) -> Result<Self> {
949 // TODO(windows): create a job object and add the child process handle to it,
950 // see https://learn.microsoft.com/en-us/windows/win32/procthread/job-objects
951 let process = smol::process::Command::from(command)
952 .stdin(stdin)
953 .stdout(Stdio::piped())
954 .stderr(Stdio::piped())
955 .spawn()?;
956 Ok(Self { process })
957 }
958
959 #[cfg(not(windows))]
960 fn kill(&mut self) {
961 let pid = self.process.id();
962 unsafe {
963 libc::killpg(pid as i32, libc::SIGKILL);
964 }
965 }
966
967 #[cfg(windows)]
968 fn kill(&mut self) {
969 // TODO(windows): terminate the job object in kill
970 let _ = self.process.kill();
971 }
972}