1use anyhow::{Context as _, Result, anyhow, bail};
2#[cfg(any(test, feature = "test-support"))]
3use async_pipe::{PipeReader, PipeWriter};
4use dap_types::{
5 ErrorResponse,
6 messages::{Message, Response},
7};
8use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
9use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task};
10use parking_lot::Mutex;
11use proto::ErrorExt;
12use settings::Settings as _;
13use smallvec::SmallVec;
14use smol::{
15 channel::{Receiver, Sender, unbounded},
16 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
17 net::{TcpListener, TcpStream},
18};
19use std::{
20 collections::HashMap,
21 net::{Ipv4Addr, SocketAddrV4},
22 process::Stdio,
23 sync::Arc,
24 time::Duration,
25};
26use task::TcpArgumentsTemplate;
27use util::ConnectionResult;
28
29use crate::{
30 adapters::{DebugAdapterBinary, TcpArguments},
31 client::DapMessageHandler,
32 debugger_settings::DebuggerSettings,
33};
34
35pub(crate) type IoMessage = str;
36pub(crate) type Command = str;
37pub type IoHandler = Box<dyn Send + FnMut(IoKind, Option<&Command>, &IoMessage)>;
38
39#[derive(PartialEq, Eq, Clone, Copy)]
40pub enum LogKind {
41 Adapter,
42 Rpc,
43}
44
45#[derive(Clone, Copy)]
46pub enum IoKind {
47 StdIn,
48 StdOut,
49 StdErr,
50}
51
52type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
53type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
54
55pub trait Transport: Send + Sync {
56 fn has_adapter_logs(&self) -> bool;
57 fn tcp_arguments(&self) -> Option<TcpArguments>;
58 fn connect(
59 &mut self,
60 ) -> Task<
61 Result<(
62 Box<dyn AsyncWrite + Unpin + Send + 'static>,
63 Box<dyn AsyncRead + Unpin + Send + 'static>,
64 )>,
65 >;
66 fn kill(&self);
67 #[cfg(any(test, feature = "test-support"))]
68 fn as_fake(&self) -> &FakeTransport {
69 unreachable!()
70 }
71}
72
73async fn start(
74 binary: &DebugAdapterBinary,
75 log_handlers: LogHandlers,
76 cx: &mut AsyncApp,
77) -> Result<Box<dyn Transport>> {
78 #[cfg(any(test, feature = "test-support"))]
79 if cfg!(any(test, feature = "test-support")) {
80 return Ok(Box::new(FakeTransport::start(cx).await?));
81 }
82
83 if binary.connection.is_some() {
84 Ok(Box::new(
85 TcpTransport::start(binary, log_handlers, cx).await?,
86 ))
87 } else {
88 Ok(Box::new(
89 StdioTransport::start(binary, log_handlers, cx).await?,
90 ))
91 }
92}
93
94pub(crate) struct TransportDelegate {
95 log_handlers: LogHandlers,
96 pending_requests: Requests,
97 pub(crate) transport: Mutex<Box<dyn Transport>>,
98 server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
99 tasks: Mutex<Vec<Task<()>>>,
100}
101
102impl TransportDelegate {
103 pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
104 let log_handlers: LogHandlers = Default::default();
105 let transport = start(binary, log_handlers.clone(), cx).await?;
106 Ok(Self {
107 transport: Mutex::new(transport),
108 log_handlers,
109 server_tx: Default::default(),
110 pending_requests: Default::default(),
111 tasks: Default::default(),
112 })
113 }
114
115 pub async fn connect(
116 &self,
117 message_handler: DapMessageHandler,
118 cx: &mut AsyncApp,
119 ) -> Result<()> {
120 let (server_tx, client_rx) = unbounded::<Message>();
121 self.tasks.lock().clear();
122
123 let log_dap_communications =
124 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
125 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
126 .unwrap_or(false);
127
128 let connect = self.transport.lock().connect();
129 let (input, output) = connect.await?;
130
131 let log_handler = if log_dap_communications {
132 Some(self.log_handlers.clone())
133 } else {
134 None
135 };
136
137 let pending_requests = self.pending_requests.clone();
138 let output_log_handler = log_handler.clone();
139 {
140 let mut tasks = self.tasks.lock();
141 tasks.push(cx.background_spawn(async move {
142 match Self::recv_from_server(
143 output,
144 message_handler,
145 pending_requests.clone(),
146 output_log_handler,
147 )
148 .await
149 {
150 Ok(()) => {
151 pending_requests.lock().drain().for_each(|(_, request)| {
152 request
153 .send(Err(anyhow!("debugger shutdown unexpectedly")))
154 .ok();
155 });
156 }
157 Err(e) => {
158 pending_requests.lock().drain().for_each(|(_, request)| {
159 request.send(Err(e.cloned())).ok();
160 });
161 }
162 }
163 }));
164
165 tasks.push(cx.background_spawn(async move {
166 match Self::send_to_server(input, client_rx, log_handler).await {
167 Ok(()) => {}
168 Err(e) => log::error!("Error handling debugger input: {e}"),
169 }
170 }));
171 }
172
173 {
174 let mut lock = self.server_tx.lock().await;
175 *lock = Some(server_tx.clone());
176 }
177
178 Ok(())
179 }
180
181 pub(crate) fn tcp_arguments(&self) -> Option<TcpArguments> {
182 self.transport.lock().tcp_arguments()
183 }
184
185 pub(crate) fn add_pending_request(
186 &self,
187 sequence_id: u64,
188 request: oneshot::Sender<Result<Response>>,
189 ) {
190 let mut pending_requests = self.pending_requests.lock();
191 pending_requests.insert(sequence_id, request);
192 }
193
194 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
195 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
196 server_tx.send(message).await.context("sending message")
197 } else {
198 anyhow::bail!("Server tx already dropped")
199 }
200 }
201
202 async fn handle_adapter_log(
203 stdout: impl AsyncRead + Unpin + Send + 'static,
204 iokind: IoKind,
205 log_handlers: LogHandlers,
206 ) {
207 let mut reader = BufReader::new(stdout);
208 let mut line = String::new();
209
210 loop {
211 line.truncate(0);
212
213 match reader.read_line(&mut line).await {
214 Ok(0) => break,
215 Ok(_) => {}
216 Err(e) => {
217 log::debug!("handle_adapter_log: {}", e);
218 break;
219 }
220 }
221
222 for (kind, handler) in log_handlers.lock().iter_mut() {
223 if matches!(kind, LogKind::Adapter) {
224 handler(iokind, None, line.as_str());
225 }
226 }
227 }
228 }
229
230 fn build_rpc_message(message: String) -> String {
231 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
232 }
233
234 async fn send_to_server<Stdin>(
235 mut server_stdin: Stdin,
236 client_rx: Receiver<Message>,
237 log_handlers: Option<LogHandlers>,
238 ) -> Result<()>
239 where
240 Stdin: AsyncWrite + Unpin + Send + 'static,
241 {
242 let result = loop {
243 match client_rx.recv().await {
244 Ok(message) => {
245 let command = match &message {
246 Message::Request(request) => Some(request.command.as_str()),
247 Message::Response(response) => Some(response.command.as_str()),
248 _ => None,
249 };
250
251 let message = match serde_json::to_string(&message) {
252 Ok(message) => message,
253 Err(e) => break Err(e.into()),
254 };
255
256 if let Some(log_handlers) = log_handlers.as_ref() {
257 for (kind, log_handler) in log_handlers.lock().iter_mut() {
258 if matches!(kind, LogKind::Rpc) {
259 log_handler(IoKind::StdIn, command, &message);
260 }
261 }
262 }
263
264 if let Err(e) = server_stdin
265 .write_all(Self::build_rpc_message(message).as_bytes())
266 .await
267 {
268 break Err(e.into());
269 }
270
271 if let Err(e) = server_stdin.flush().await {
272 break Err(e.into());
273 }
274 }
275 Err(error) => break Err(error.into()),
276 }
277 };
278
279 log::debug!("Handle adapter input dropped");
280
281 result
282 }
283
284 async fn recv_from_server<Stdout>(
285 server_stdout: Stdout,
286 mut message_handler: DapMessageHandler,
287 pending_requests: Requests,
288 log_handlers: Option<LogHandlers>,
289 ) -> Result<()>
290 where
291 Stdout: AsyncRead + Unpin + Send + 'static,
292 {
293 let mut recv_buffer = String::new();
294 let mut reader = BufReader::new(server_stdout);
295
296 let result = loop {
297 match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
298 .await
299 {
300 ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
301 ConnectionResult::ConnectionReset => {
302 log::info!("Debugger closed the connection");
303 return Ok(());
304 }
305 ConnectionResult::Result(Ok(Message::Response(res))) => {
306 let tx = pending_requests.lock().remove(&res.request_seq);
307 if let Some(tx) = tx {
308 if let Err(e) = tx.send(Self::process_response(res)) {
309 log::trace!("Did not send response `{:?}` for a cancelled", e);
310 }
311 } else {
312 message_handler(Message::Response(res))
313 }
314 }
315 ConnectionResult::Result(Ok(message)) => message_handler(message),
316 ConnectionResult::Result(Err(e)) => break Err(e),
317 }
318 };
319
320 log::debug!("Handle adapter output dropped");
321
322 result
323 }
324
325 fn process_response(response: Response) -> Result<Response> {
326 if response.success {
327 Ok(response)
328 } else {
329 if let Some(error_message) = response
330 .body
331 .clone()
332 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
333 .and_then(|response| response.error.map(|msg| msg.format))
334 .or_else(|| response.message.clone())
335 {
336 anyhow::bail!(error_message);
337 };
338
339 anyhow::bail!(
340 "Received error response from adapter. Response: {:?}",
341 response
342 );
343 }
344 }
345
346 async fn receive_server_message<Stdout>(
347 reader: &mut BufReader<Stdout>,
348 buffer: &mut String,
349 log_handlers: Option<&LogHandlers>,
350 ) -> ConnectionResult<Message>
351 where
352 Stdout: AsyncRead + Unpin + Send + 'static,
353 {
354 let mut content_length = None;
355 loop {
356 buffer.truncate(0);
357
358 match reader.read_line(buffer).await {
359 Ok(0) => return ConnectionResult::ConnectionReset,
360 Ok(_) => {}
361 Err(e) => return ConnectionResult::Result(Err(e.into())),
362 };
363
364 if buffer == "\r\n" {
365 break;
366 }
367
368 if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
369 match value.parse().context("invalid content length") {
370 Ok(length) => content_length = Some(length),
371 Err(e) => return ConnectionResult::Result(Err(e)),
372 }
373 }
374 }
375
376 let content_length = match content_length.context("missing content length") {
377 Ok(length) => length,
378 Err(e) => return ConnectionResult::Result(Err(e)),
379 };
380
381 let mut content = vec![0; content_length];
382 if let Err(e) = reader
383 .read_exact(&mut content)
384 .await
385 .with_context(|| "reading after a loop")
386 {
387 return ConnectionResult::Result(Err(e));
388 }
389
390 let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
391 Ok(str) => str,
392 Err(e) => return ConnectionResult::Result(Err(e)),
393 };
394
395 let message =
396 serde_json::from_str::<Message>(message_str).context("deserializing server message");
397
398 if let Some(log_handlers) = log_handlers {
399 let command = match &message {
400 Ok(Message::Request(request)) => Some(request.command.as_str()),
401 Ok(Message::Response(response)) => Some(response.command.as_str()),
402 _ => None,
403 };
404
405 for (kind, log_handler) in log_handlers.lock().iter_mut() {
406 if matches!(kind, LogKind::Rpc) {
407 log_handler(IoKind::StdOut, command, message_str);
408 }
409 }
410 }
411
412 ConnectionResult::Result(message)
413 }
414
415 pub async fn shutdown(&self) -> Result<()> {
416 log::debug!("Start shutdown client");
417
418 if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
419 server_tx.close();
420 }
421
422 self.pending_requests.lock().clear();
423 self.transport.lock().kill();
424
425 log::debug!("Shutdown client completed");
426
427 anyhow::Ok(())
428 }
429
430 pub fn has_adapter_logs(&self) -> bool {
431 self.transport.lock().has_adapter_logs()
432 }
433
434 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
435 where
436 F: 'static + Send + FnMut(IoKind, Option<&Command>, &IoMessage),
437 {
438 let mut log_handlers = self.log_handlers.lock();
439 log_handlers.push((kind, Box::new(f)));
440 }
441}
442
443pub struct TcpTransport {
444 executor: BackgroundExecutor,
445 pub port: u16,
446 pub host: Ipv4Addr,
447 pub timeout: u64,
448 process: Arc<Mutex<Option<Child>>>,
449 _stderr_task: Option<Task<()>>,
450 _stdout_task: Option<Task<()>>,
451}
452
453impl TcpTransport {
454 /// Get an open port to use with the tcp client when not supplied by debug config
455 pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
456 if let Some(port) = host.port {
457 Ok(port)
458 } else {
459 Self::unused_port(host.host()).await
460 }
461 }
462
463 pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
464 Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
465 .await?
466 .local_addr()?
467 .port())
468 }
469
470 async fn start(
471 binary: &DebugAdapterBinary,
472 log_handlers: LogHandlers,
473 cx: &mut AsyncApp,
474 ) -> Result<Self> {
475 let connection_args = binary
476 .connection
477 .as_ref()
478 .context("No connection arguments provided")?;
479
480 let host = connection_args.host;
481 let port = connection_args.port;
482
483 let mut process = None;
484 let mut stdout_task = None;
485 let mut stderr_task = None;
486
487 if let Some(command) = &binary.command {
488 let mut command = util::command::new_std_command(&command);
489
490 if let Some(cwd) = &binary.cwd {
491 command.current_dir(cwd);
492 }
493
494 command.args(&binary.arguments);
495 command.envs(&binary.envs);
496
497 let mut p = Child::spawn(command, Stdio::null())
498 .with_context(|| "failed to start debug adapter.")?;
499
500 stdout_task = p.stdout.take().map(|stdout| {
501 cx.background_executor()
502 .spawn(TransportDelegate::handle_adapter_log(
503 stdout,
504 IoKind::StdOut,
505 log_handlers.clone(),
506 ))
507 });
508 stderr_task = p.stderr.take().map(|stderr| {
509 cx.background_executor()
510 .spawn(TransportDelegate::handle_adapter_log(
511 stderr,
512 IoKind::StdErr,
513 log_handlers,
514 ))
515 });
516 process = Some(p);
517 };
518
519 let timeout = connection_args.timeout.unwrap_or_else(|| {
520 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
521 .unwrap_or(20000u64)
522 });
523
524 log::info!(
525 "Debug adapter has connected to TCP server {}:{}",
526 host,
527 port
528 );
529
530 let this = Self {
531 executor: cx.background_executor().clone(),
532 port,
533 host,
534 process: Arc::new(Mutex::new(process)),
535 timeout,
536 _stdout_task: stdout_task,
537 _stderr_task: stderr_task,
538 };
539
540 Ok(this)
541 }
542}
543
544impl Transport for TcpTransport {
545 fn has_adapter_logs(&self) -> bool {
546 true
547 }
548
549 fn kill(&self) {
550 if let Some(process) = &mut *self.process.lock() {
551 process.kill();
552 }
553 }
554
555 fn tcp_arguments(&self) -> Option<TcpArguments> {
556 Some(TcpArguments {
557 host: self.host,
558 port: self.port,
559 timeout: Some(self.timeout),
560 })
561 }
562
563 fn connect(
564 &mut self,
565 ) -> Task<
566 Result<(
567 Box<dyn AsyncWrite + Unpin + Send + 'static>,
568 Box<dyn AsyncRead + Unpin + Send + 'static>,
569 )>,
570 > {
571 let executor = self.executor.clone();
572 let timeout = self.timeout;
573 let address = SocketAddrV4::new(self.host, self.port);
574 let process = self.process.clone();
575 executor.clone().spawn(async move {
576 select! {
577 _ = executor.timer(Duration::from_millis(timeout)).fuse() => {
578 anyhow::bail!("Connection to TCP DAP timeout {address}");
579 },
580 result = executor.clone().spawn(async move {
581 loop {
582 match TcpStream::connect(address).await {
583 Ok(stream) => {
584 let (read, write) = stream.split();
585 return Ok((Box::new(write) as _, Box::new(read) as _))
586 },
587 Err(_) => {
588 let has_process = process.lock().is_some();
589 if has_process {
590 let status = process.lock().as_mut().unwrap().try_status();
591 if let Ok(Some(_)) = status {
592 let process = process.lock().take().unwrap().into_inner();
593 let output = process.output().await?;
594 let output = if output.stderr.is_empty() {
595 String::from_utf8_lossy(&output.stdout).to_string()
596 } else {
597 String::from_utf8_lossy(&output.stderr).to_string()
598 };
599 anyhow::bail!("{output}\nerror: process exited before debugger attached.");
600 }
601 }
602
603 executor.timer(Duration::from_millis(100)).await;
604 }
605 }
606 }
607 }).fuse() => result
608 }
609 })
610 }
611}
612
613impl Drop for TcpTransport {
614 fn drop(&mut self) {
615 if let Some(mut p) = self.process.lock().take() {
616 p.kill();
617 }
618 }
619}
620
621pub struct StdioTransport {
622 process: Mutex<Child>,
623 _stderr_task: Option<Task<()>>,
624}
625
626impl StdioTransport {
627 // #[allow(dead_code, reason = "This is used in non test builds of Zed")]
628 async fn start(
629 binary: &DebugAdapterBinary,
630 log_handlers: LogHandlers,
631 cx: &mut AsyncApp,
632 ) -> Result<Self> {
633 let Some(binary_command) = &binary.command else {
634 bail!(
635 "When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
636 );
637 };
638 let mut command = util::command::new_std_command(&binary_command);
639
640 if let Some(cwd) = &binary.cwd {
641 command.current_dir(cwd);
642 }
643
644 command.args(&binary.arguments);
645 command.envs(&binary.envs);
646
647 let mut process = Child::spawn(command, Stdio::piped()).with_context(|| {
648 format!(
649 "failed to spawn command `{} {}`.",
650 binary_command,
651 binary.arguments.join(" ")
652 )
653 })?;
654
655 let err_task = process.stderr.take().map(|stderr| {
656 cx.background_spawn(TransportDelegate::handle_adapter_log(
657 stderr,
658 IoKind::StdErr,
659 log_handlers,
660 ))
661 });
662
663 let process = Mutex::new(process);
664
665 Ok(Self {
666 process,
667 _stderr_task: err_task,
668 })
669 }
670}
671
672impl Transport for StdioTransport {
673 fn has_adapter_logs(&self) -> bool {
674 false
675 }
676
677 fn kill(&self) {
678 self.process.lock().kill()
679 }
680
681 fn connect(
682 &mut self,
683 ) -> Task<
684 Result<(
685 Box<dyn AsyncWrite + Unpin + Send + 'static>,
686 Box<dyn AsyncRead + Unpin + Send + 'static>,
687 )>,
688 > {
689 let mut process = self.process.lock();
690 let result = util::maybe!({
691 Ok((
692 Box::new(process.stdin.take().context("Cannot reconnect")?) as _,
693 Box::new(process.stdout.take().context("Cannot reconnect")?) as _,
694 ))
695 });
696 Task::ready(result)
697 }
698
699 fn tcp_arguments(&self) -> Option<TcpArguments> {
700 None
701 }
702}
703
704impl Drop for StdioTransport {
705 fn drop(&mut self) {
706 self.process.get_mut().kill();
707 }
708}
709
710#[cfg(any(test, feature = "test-support"))]
711type RequestHandler =
712 Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
713
714#[cfg(any(test, feature = "test-support"))]
715type ResponseHandler = Box<dyn Send + Fn(Response)>;
716
717#[cfg(any(test, feature = "test-support"))]
718pub struct FakeTransport {
719 // for sending fake response back from adapter side
720 request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
721 // for reverse request responses
722 response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
723
724 stdin_writer: Option<PipeWriter>,
725 stdout_reader: Option<PipeReader>,
726}
727
728#[cfg(any(test, feature = "test-support"))]
729impl FakeTransport {
730 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
731 where
732 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
733 {
734 self.request_handlers.lock().insert(
735 R::COMMAND,
736 Box::new(move |seq, args| {
737 let result = handler(seq, serde_json::from_value(args).unwrap());
738 let response = match result {
739 Ok(response) => Response {
740 seq: seq + 1,
741 request_seq: seq,
742 success: true,
743 command: R::COMMAND.into(),
744 body: Some(serde_json::to_value(response).unwrap()),
745 message: None,
746 },
747 Err(response) => Response {
748 seq: seq + 1,
749 request_seq: seq,
750 success: false,
751 command: R::COMMAND.into(),
752 body: Some(serde_json::to_value(response).unwrap()),
753 message: None,
754 },
755 };
756 response
757 }),
758 );
759 }
760
761 pub fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
762 where
763 F: 'static + Send + Fn(Response),
764 {
765 self.response_handlers
766 .lock()
767 .insert(R::COMMAND, Box::new(handler));
768 }
769
770 async fn start(cx: &mut AsyncApp) -> Result<Self> {
771 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
772 use serde_json::json;
773
774 let (stdin_writer, stdin_reader) = async_pipe::pipe();
775 let (stdout_writer, stdout_reader) = async_pipe::pipe();
776
777 let this = Self {
778 request_handlers: Arc::new(Mutex::new(HashMap::default())),
779 response_handlers: Arc::new(Mutex::new(HashMap::default())),
780 stdin_writer: Some(stdin_writer),
781 stdout_reader: Some(stdout_reader),
782 };
783
784 let request_handlers = this.request_handlers.clone();
785 let response_handlers = this.response_handlers.clone();
786 let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer));
787
788 cx.background_spawn(async move {
789 let mut reader = BufReader::new(stdin_reader);
790 let mut buffer = String::new();
791
792 loop {
793 match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
794 .await
795 {
796 ConnectionResult::Timeout => {
797 anyhow::bail!("Timed out when connecting to debugger");
798 }
799 ConnectionResult::ConnectionReset => {
800 log::info!("Debugger closed the connection");
801 break Ok(());
802 }
803 ConnectionResult::Result(Err(e)) => break Err(e),
804 ConnectionResult::Result(Ok(message)) => {
805 match message {
806 Message::Request(request) => {
807 // redirect reverse requests to stdout writer/reader
808 if request.command == RunInTerminal::COMMAND
809 || request.command == StartDebugging::COMMAND
810 {
811 let message =
812 serde_json::to_string(&Message::Request(request)).unwrap();
813
814 let mut writer = stdout_writer.lock().await;
815 writer
816 .write_all(
817 TransportDelegate::build_rpc_message(message)
818 .as_bytes(),
819 )
820 .await
821 .unwrap();
822 writer.flush().await.unwrap();
823 } else {
824 let response = if let Some(handle) =
825 request_handlers.lock().get_mut(request.command.as_str())
826 {
827 handle(request.seq, request.arguments.unwrap_or(json!({})))
828 } else {
829 panic!("No request handler for {}", request.command);
830 };
831 let message =
832 serde_json::to_string(&Message::Response(response))
833 .unwrap();
834
835 let mut writer = stdout_writer.lock().await;
836
837 writer
838 .write_all(
839 TransportDelegate::build_rpc_message(message)
840 .as_bytes(),
841 )
842 .await
843 .unwrap();
844 writer.flush().await.unwrap();
845 }
846 }
847 Message::Event(event) => {
848 let message =
849 serde_json::to_string(&Message::Event(event)).unwrap();
850
851 let mut writer = stdout_writer.lock().await;
852 writer
853 .write_all(
854 TransportDelegate::build_rpc_message(message).as_bytes(),
855 )
856 .await
857 .unwrap();
858 writer.flush().await.unwrap();
859 }
860 Message::Response(response) => {
861 if let Some(handle) =
862 response_handlers.lock().get(response.command.as_str())
863 {
864 handle(response);
865 } else {
866 log::error!("No response handler for {}", response.command);
867 }
868 }
869 }
870 }
871 }
872 }
873 })
874 .detach();
875
876 Ok(this)
877 }
878}
879
880#[cfg(any(test, feature = "test-support"))]
881impl Transport for FakeTransport {
882 fn tcp_arguments(&self) -> Option<TcpArguments> {
883 None
884 }
885
886 fn connect(
887 &mut self,
888 ) -> Task<
889 Result<(
890 Box<dyn AsyncWrite + Unpin + Send + 'static>,
891 Box<dyn AsyncRead + Unpin + Send + 'static>,
892 )>,
893 > {
894 let result = util::maybe!({
895 Ok((
896 Box::new(self.stdin_writer.take().context("Cannot reconnect")?) as _,
897 Box::new(self.stdout_reader.take().context("Cannot reconnect")?) as _,
898 ))
899 });
900 Task::ready(result)
901 }
902
903 fn has_adapter_logs(&self) -> bool {
904 false
905 }
906
907 fn kill(&self) {}
908
909 #[cfg(any(test, feature = "test-support"))]
910 fn as_fake(&self) -> &FakeTransport {
911 self
912 }
913}
914
915struct Child {
916 process: smol::process::Child,
917}
918
919impl std::ops::Deref for Child {
920 type Target = smol::process::Child;
921
922 fn deref(&self) -> &Self::Target {
923 &self.process
924 }
925}
926
927impl std::ops::DerefMut for Child {
928 fn deref_mut(&mut self) -> &mut Self::Target {
929 &mut self.process
930 }
931}
932
933impl Child {
934 fn into_inner(self) -> smol::process::Child {
935 self.process
936 }
937
938 #[cfg(not(windows))]
939 fn spawn(mut command: std::process::Command, stdin: Stdio) -> Result<Self> {
940 util::set_pre_exec_to_start_new_session(&mut command);
941 let process = smol::process::Command::from(command)
942 .stdin(stdin)
943 .stdout(Stdio::piped())
944 .stderr(Stdio::piped())
945 .spawn()?;
946 Ok(Self { process })
947 }
948
949 #[cfg(windows)]
950 fn spawn(command: std::process::Command, stdin: Stdio) -> Result<Self> {
951 // TODO(windows): create a job object and add the child process handle to it,
952 // see https://learn.microsoft.com/en-us/windows/win32/procthread/job-objects
953 let process = smol::process::Command::from(command)
954 .stdin(stdin)
955 .stdout(Stdio::piped())
956 .stderr(Stdio::piped())
957 .spawn()?;
958 Ok(Self { process })
959 }
960
961 #[cfg(not(windows))]
962 fn kill(&mut self) {
963 let pid = self.process.id();
964 unsafe {
965 libc::killpg(pid as i32, libc::SIGKILL);
966 }
967 }
968
969 #[cfg(windows)]
970 fn kill(&mut self) {
971 // TODO(windows): terminate the job object in kill
972 let _ = self.process.kill();
973 }
974}