1use anyhow::{Context as _, Result, anyhow, bail};
2#[cfg(any(test, feature = "test-support"))]
3use async_pipe::{PipeReader, PipeWriter};
4use dap_types::{
5 ErrorResponse,
6 messages::{Message, Response},
7};
8use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
9use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task};
10use parking_lot::Mutex;
11use proto::ErrorExt;
12use settings::Settings as _;
13use smallvec::SmallVec;
14use smol::{
15 channel::{Receiver, Sender, unbounded},
16 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
17 net::{TcpListener, TcpStream},
18};
19use std::{
20 collections::HashMap,
21 net::{Ipv4Addr, SocketAddrV4},
22 process::Stdio,
23 sync::Arc,
24 time::Duration,
25};
26use task::TcpArgumentsTemplate;
27use util::ConnectionResult;
28
29use crate::{
30 adapters::{DebugAdapterBinary, TcpArguments},
31 client::DapMessageHandler,
32 debugger_settings::DebuggerSettings,
33};
34
35pub(crate) type IoMessage = str;
36pub(crate) type Command = str;
37pub type IoHandler = Box<dyn Send + FnMut(IoKind, Option<&Command>, &IoMessage)>;
38
39#[derive(PartialEq, Eq, Clone, Copy)]
40pub enum LogKind {
41 Adapter,
42 Rpc,
43}
44
45#[derive(Clone, Copy)]
46pub enum IoKind {
47 StdIn,
48 StdOut,
49 StdErr,
50}
51
52type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
53type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
54
55pub trait Transport: Send + Sync {
56 fn has_adapter_logs(&self) -> bool;
57 fn tcp_arguments(&self) -> Option<TcpArguments>;
58 fn connect(
59 &mut self,
60 ) -> Task<
61 Result<(
62 Box<dyn AsyncWrite + Unpin + Send + 'static>,
63 Box<dyn AsyncRead + Unpin + Send + 'static>,
64 )>,
65 >;
66 fn kill(&mut self);
67 #[cfg(any(test, feature = "test-support"))]
68 fn as_fake(&self) -> &FakeTransport {
69 unreachable!()
70 }
71}
72
73async fn start(
74 binary: &DebugAdapterBinary,
75 log_handlers: LogHandlers,
76 cx: &mut AsyncApp,
77) -> Result<Box<dyn Transport>> {
78 #[cfg(any(test, feature = "test-support"))]
79 if cfg!(any(test, feature = "test-support")) {
80 return Ok(Box::new(FakeTransport::start(cx).await?));
81 }
82
83 if binary.connection.is_some() {
84 Ok(Box::new(
85 TcpTransport::start(binary, log_handlers, cx).await?,
86 ))
87 } else {
88 Ok(Box::new(
89 StdioTransport::start(binary, log_handlers, cx).await?,
90 ))
91 }
92}
93
94pub(crate) struct TransportDelegate {
95 log_handlers: LogHandlers,
96 pub(crate) pending_requests: Requests,
97 pub(crate) transport: Mutex<Box<dyn Transport>>,
98 pub(crate) server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
99 tasks: Mutex<Vec<Task<()>>>,
100}
101
102impl Drop for TransportDelegate {
103 fn drop(&mut self) {
104 self.transport.lock().kill()
105 }
106}
107
108impl TransportDelegate {
109 pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
110 let log_handlers: LogHandlers = Default::default();
111 let transport = start(binary, log_handlers.clone(), cx).await?;
112 Ok(Self {
113 transport: Mutex::new(transport),
114 log_handlers,
115 server_tx: Default::default(),
116 pending_requests: Default::default(),
117 tasks: Default::default(),
118 })
119 }
120
121 pub async fn connect(
122 &self,
123 message_handler: DapMessageHandler,
124 cx: &mut AsyncApp,
125 ) -> Result<()> {
126 let (server_tx, client_rx) = unbounded::<Message>();
127 self.tasks.lock().clear();
128
129 let log_dap_communications =
130 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
131 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
132 .unwrap_or(false);
133
134 let connect = self.transport.lock().connect();
135 let (input, output) = connect.await?;
136
137 let log_handler = if log_dap_communications {
138 Some(self.log_handlers.clone())
139 } else {
140 None
141 };
142
143 let pending_requests = self.pending_requests.clone();
144 let output_log_handler = log_handler.clone();
145 {
146 let mut tasks = self.tasks.lock();
147 tasks.push(cx.background_spawn(async move {
148 match Self::recv_from_server(
149 output,
150 message_handler,
151 pending_requests.clone(),
152 output_log_handler,
153 )
154 .await
155 {
156 Ok(()) => {
157 pending_requests.lock().drain().for_each(|(_, request)| {
158 request
159 .send(Err(anyhow!("debugger shutdown unexpectedly")))
160 .ok();
161 });
162 }
163 Err(e) => {
164 pending_requests.lock().drain().for_each(|(_, request)| {
165 request.send(Err(e.cloned())).ok();
166 });
167 }
168 }
169 }));
170
171 tasks.push(cx.background_spawn(async move {
172 match Self::send_to_server(input, client_rx, log_handler).await {
173 Ok(()) => {}
174 Err(e) => log::error!("Error handling debugger input: {e}"),
175 }
176 }));
177 }
178
179 {
180 let mut lock = self.server_tx.lock().await;
181 *lock = Some(server_tx.clone());
182 }
183
184 Ok(())
185 }
186
187 pub(crate) fn tcp_arguments(&self) -> Option<TcpArguments> {
188 self.transport.lock().tcp_arguments()
189 }
190
191 pub(crate) fn add_pending_request(
192 &self,
193 sequence_id: u64,
194 request: oneshot::Sender<Result<Response>>,
195 ) {
196 let mut pending_requests = self.pending_requests.lock();
197 pending_requests.insert(sequence_id, request);
198 }
199
200 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
201 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
202 server_tx.send(message).await.context("sending message")
203 } else {
204 anyhow::bail!("Server tx already dropped")
205 }
206 }
207
208 async fn handle_adapter_log(
209 stdout: impl AsyncRead + Unpin + Send + 'static,
210 iokind: IoKind,
211 log_handlers: LogHandlers,
212 ) {
213 let mut reader = BufReader::new(stdout);
214 let mut line = String::new();
215
216 loop {
217 line.truncate(0);
218
219 match reader.read_line(&mut line).await {
220 Ok(0) => break,
221 Ok(_) => {}
222 Err(e) => {
223 log::debug!("handle_adapter_log: {}", e);
224 break;
225 }
226 }
227
228 for (kind, handler) in log_handlers.lock().iter_mut() {
229 if matches!(kind, LogKind::Adapter) {
230 handler(iokind, None, line.as_str());
231 }
232 }
233 }
234 }
235
236 fn build_rpc_message(message: String) -> String {
237 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
238 }
239
240 async fn send_to_server<Stdin>(
241 mut server_stdin: Stdin,
242 client_rx: Receiver<Message>,
243 log_handlers: Option<LogHandlers>,
244 ) -> Result<()>
245 where
246 Stdin: AsyncWrite + Unpin + Send + 'static,
247 {
248 let result = loop {
249 match client_rx.recv().await {
250 Ok(message) => {
251 let command = match &message {
252 Message::Request(request) => Some(request.command.as_str()),
253 Message::Response(response) => Some(response.command.as_str()),
254 _ => None,
255 };
256
257 let message = match serde_json::to_string(&message) {
258 Ok(message) => message,
259 Err(e) => break Err(e.into()),
260 };
261
262 if let Some(log_handlers) = log_handlers.as_ref() {
263 for (kind, log_handler) in log_handlers.lock().iter_mut() {
264 if matches!(kind, LogKind::Rpc) {
265 log_handler(IoKind::StdIn, command, &message);
266 }
267 }
268 }
269
270 if let Err(e) = server_stdin
271 .write_all(Self::build_rpc_message(message).as_bytes())
272 .await
273 {
274 break Err(e.into());
275 }
276
277 if let Err(e) = server_stdin.flush().await {
278 break Err(e.into());
279 }
280 }
281 Err(error) => break Err(error.into()),
282 }
283 };
284
285 log::debug!("Handle adapter input dropped");
286
287 result
288 }
289
290 async fn recv_from_server<Stdout>(
291 server_stdout: Stdout,
292 mut message_handler: DapMessageHandler,
293 pending_requests: Requests,
294 log_handlers: Option<LogHandlers>,
295 ) -> Result<()>
296 where
297 Stdout: AsyncRead + Unpin + Send + 'static,
298 {
299 let mut recv_buffer = String::new();
300 let mut reader = BufReader::new(server_stdout);
301
302 let result = loop {
303 match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
304 .await
305 {
306 ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
307 ConnectionResult::ConnectionReset => {
308 log::info!("Debugger closed the connection");
309 return Ok(());
310 }
311 ConnectionResult::Result(Ok(Message::Response(res))) => {
312 let tx = pending_requests.lock().remove(&res.request_seq);
313 if let Some(tx) = tx {
314 if let Err(e) = tx.send(Self::process_response(res)) {
315 log::trace!("Did not send response `{:?}` for a cancelled", e);
316 }
317 } else {
318 message_handler(Message::Response(res))
319 }
320 }
321 ConnectionResult::Result(Ok(message)) => message_handler(message),
322 ConnectionResult::Result(Err(e)) => break Err(e),
323 }
324 };
325
326 log::debug!("Handle adapter output dropped");
327
328 result
329 }
330
331 fn process_response(response: Response) -> Result<Response> {
332 if response.success {
333 Ok(response)
334 } else {
335 if let Some(error_message) = response
336 .body
337 .clone()
338 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
339 .and_then(|response| response.error.map(|msg| msg.format))
340 .or_else(|| response.message.clone())
341 {
342 anyhow::bail!(error_message);
343 };
344
345 anyhow::bail!(
346 "Received error response from adapter. Response: {:?}",
347 response
348 );
349 }
350 }
351
352 async fn receive_server_message<Stdout>(
353 reader: &mut BufReader<Stdout>,
354 buffer: &mut String,
355 log_handlers: Option<&LogHandlers>,
356 ) -> ConnectionResult<Message>
357 where
358 Stdout: AsyncRead + Unpin + Send + 'static,
359 {
360 let mut content_length = None;
361 loop {
362 buffer.truncate(0);
363 match reader.read_line(buffer).await {
364 Ok(0) => return ConnectionResult::ConnectionReset,
365 Ok(_) => {}
366 Err(e) => return ConnectionResult::Result(Err(e.into())),
367 };
368
369 if buffer == "\r\n" {
370 break;
371 }
372
373 if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
374 match value.parse().context("invalid content length") {
375 Ok(length) => content_length = Some(length),
376 Err(e) => return ConnectionResult::Result(Err(e)),
377 }
378 }
379 }
380
381 let content_length = match content_length.context("missing content length") {
382 Ok(length) => length,
383 Err(e) => return ConnectionResult::Result(Err(e)),
384 };
385
386 let mut content = vec![0; content_length];
387 if let Err(e) = reader
388 .read_exact(&mut content)
389 .await
390 .with_context(|| "reading after a loop")
391 {
392 return ConnectionResult::Result(Err(e));
393 }
394
395 let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
396 Ok(str) => str,
397 Err(e) => return ConnectionResult::Result(Err(e)),
398 };
399
400 let message =
401 serde_json::from_str::<Message>(message_str).context("deserializing server message");
402
403 if let Some(log_handlers) = log_handlers {
404 let command = match &message {
405 Ok(Message::Request(request)) => Some(request.command.as_str()),
406 Ok(Message::Response(response)) => Some(response.command.as_str()),
407 _ => None,
408 };
409
410 for (kind, log_handler) in log_handlers.lock().iter_mut() {
411 if matches!(kind, LogKind::Rpc) {
412 log_handler(IoKind::StdOut, command, message_str);
413 }
414 }
415 }
416
417 ConnectionResult::Result(message)
418 }
419
420 pub fn has_adapter_logs(&self) -> bool {
421 self.transport.lock().has_adapter_logs()
422 }
423
424 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
425 where
426 F: 'static + Send + FnMut(IoKind, Option<&Command>, &IoMessage),
427 {
428 let mut log_handlers = self.log_handlers.lock();
429 log_handlers.push((kind, Box::new(f)));
430 }
431}
432
433pub struct TcpTransport {
434 executor: BackgroundExecutor,
435 pub port: u16,
436 pub host: Ipv4Addr,
437 pub timeout: u64,
438 process: Arc<Mutex<Option<Child>>>,
439 _stderr_task: Option<Task<()>>,
440 _stdout_task: Option<Task<()>>,
441}
442
443impl TcpTransport {
444 /// Get an open port to use with the tcp client when not supplied by debug config
445 pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
446 if let Some(port) = host.port {
447 Ok(port)
448 } else {
449 Self::unused_port(host.host()).await
450 }
451 }
452
453 pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
454 Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
455 .await?
456 .local_addr()?
457 .port())
458 }
459
460 async fn start(
461 binary: &DebugAdapterBinary,
462 log_handlers: LogHandlers,
463 cx: &mut AsyncApp,
464 ) -> Result<Self> {
465 let connection_args = binary
466 .connection
467 .as_ref()
468 .context("No connection arguments provided")?;
469
470 let host = connection_args.host;
471 let port = connection_args.port;
472
473 let mut process = None;
474 let mut stdout_task = None;
475 let mut stderr_task = None;
476
477 if let Some(command) = &binary.command {
478 let mut command = util::command::new_std_command(&command);
479
480 if let Some(cwd) = &binary.cwd {
481 command.current_dir(cwd);
482 }
483
484 command.args(&binary.arguments);
485 command.envs(&binary.envs);
486
487 let mut p = Child::spawn(command, Stdio::null())
488 .with_context(|| "failed to start debug adapter.")?;
489
490 stdout_task = p.stdout.take().map(|stdout| {
491 cx.background_executor()
492 .spawn(TransportDelegate::handle_adapter_log(
493 stdout,
494 IoKind::StdOut,
495 log_handlers.clone(),
496 ))
497 });
498 stderr_task = p.stderr.take().map(|stderr| {
499 cx.background_executor()
500 .spawn(TransportDelegate::handle_adapter_log(
501 stderr,
502 IoKind::StdErr,
503 log_handlers,
504 ))
505 });
506 process = Some(p);
507 };
508
509 let timeout = connection_args.timeout.unwrap_or_else(|| {
510 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
511 .unwrap_or(20000u64)
512 });
513
514 log::info!(
515 "Debug adapter has connected to TCP server {}:{}",
516 host,
517 port
518 );
519
520 let this = Self {
521 executor: cx.background_executor().clone(),
522 port,
523 host,
524 process: Arc::new(Mutex::new(process)),
525 timeout,
526 _stdout_task: stdout_task,
527 _stderr_task: stderr_task,
528 };
529
530 Ok(this)
531 }
532}
533
534impl Transport for TcpTransport {
535 fn has_adapter_logs(&self) -> bool {
536 true
537 }
538
539 fn kill(&mut self) {
540 if let Some(process) = &mut *self.process.lock() {
541 process.kill();
542 }
543 }
544
545 fn tcp_arguments(&self) -> Option<TcpArguments> {
546 Some(TcpArguments {
547 host: self.host,
548 port: self.port,
549 timeout: Some(self.timeout),
550 })
551 }
552
553 fn connect(
554 &mut self,
555 ) -> Task<
556 Result<(
557 Box<dyn AsyncWrite + Unpin + Send + 'static>,
558 Box<dyn AsyncRead + Unpin + Send + 'static>,
559 )>,
560 > {
561 let executor = self.executor.clone();
562 let timeout = self.timeout;
563 let address = SocketAddrV4::new(self.host, self.port);
564 let process = self.process.clone();
565 executor.clone().spawn(async move {
566 select! {
567 _ = executor.timer(Duration::from_millis(timeout)).fuse() => {
568 anyhow::bail!("Connection to TCP DAP timeout {address}");
569 },
570 result = executor.clone().spawn(async move {
571 loop {
572 match TcpStream::connect(address).await {
573 Ok(stream) => {
574 let (read, write) = stream.split();
575 return Ok((Box::new(write) as _, Box::new(read) as _))
576 },
577 Err(_) => {
578 let has_process = process.lock().is_some();
579 if has_process {
580 let status = process.lock().as_mut().unwrap().try_status();
581 if let Ok(Some(_)) = status {
582 let process = process.lock().take().unwrap().into_inner();
583 let output = process.output().await?;
584 let output = if output.stderr.is_empty() {
585 String::from_utf8_lossy(&output.stdout).to_string()
586 } else {
587 String::from_utf8_lossy(&output.stderr).to_string()
588 };
589 anyhow::bail!("{output}\nerror: process exited before debugger attached.");
590 }
591 }
592
593 executor.timer(Duration::from_millis(100)).await;
594 }
595 }
596 }
597 }).fuse() => result
598 }
599 })
600 }
601}
602
603impl Drop for TcpTransport {
604 fn drop(&mut self) {
605 if let Some(mut p) = self.process.lock().take() {
606 p.kill()
607 }
608 }
609}
610
611pub struct StdioTransport {
612 process: Mutex<Option<Child>>,
613 _stderr_task: Option<Task<()>>,
614}
615
616impl StdioTransport {
617 // #[allow(dead_code, reason = "This is used in non test builds of Zed")]
618 async fn start(
619 binary: &DebugAdapterBinary,
620 log_handlers: LogHandlers,
621 cx: &mut AsyncApp,
622 ) -> Result<Self> {
623 let Some(binary_command) = &binary.command else {
624 bail!(
625 "When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
626 );
627 };
628 let mut command = util::command::new_std_command(&binary_command);
629
630 if let Some(cwd) = &binary.cwd {
631 command.current_dir(cwd);
632 }
633
634 command.args(&binary.arguments);
635 command.envs(&binary.envs);
636
637 let mut process = Child::spawn(command, Stdio::piped()).with_context(|| {
638 format!(
639 "failed to spawn command `{} {}`.",
640 binary_command,
641 binary.arguments.join(" ")
642 )
643 })?;
644
645 let err_task = process.stderr.take().map(|stderr| {
646 cx.background_spawn(TransportDelegate::handle_adapter_log(
647 stderr,
648 IoKind::StdErr,
649 log_handlers,
650 ))
651 });
652
653 let process = Mutex::new(Some(process));
654
655 Ok(Self {
656 process,
657 _stderr_task: err_task,
658 })
659 }
660}
661
662impl Transport for StdioTransport {
663 fn has_adapter_logs(&self) -> bool {
664 false
665 }
666
667 fn kill(&mut self) {
668 if let Some(process) = &mut *self.process.lock() {
669 process.kill();
670 }
671 }
672
673 fn connect(
674 &mut self,
675 ) -> Task<
676 Result<(
677 Box<dyn AsyncWrite + Unpin + Send + 'static>,
678 Box<dyn AsyncRead + Unpin + Send + 'static>,
679 )>,
680 > {
681 let result = util::maybe!({
682 let mut guard = self.process.lock();
683 let process = guard.as_mut().context("oops")?;
684 Ok((
685 Box::new(process.stdin.take().context("Cannot reconnect")?) as _,
686 Box::new(process.stdout.take().context("Cannot reconnect")?) as _,
687 ))
688 });
689 Task::ready(result)
690 }
691
692 fn tcp_arguments(&self) -> Option<TcpArguments> {
693 None
694 }
695}
696
697impl Drop for StdioTransport {
698 fn drop(&mut self) {
699 if let Some(process) = &mut *self.process.lock() {
700 process.kill();
701 }
702 }
703}
704
705#[cfg(any(test, feature = "test-support"))]
706type RequestHandler =
707 Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
708
709#[cfg(any(test, feature = "test-support"))]
710type ResponseHandler = Box<dyn Send + Fn(Response)>;
711
712#[cfg(any(test, feature = "test-support"))]
713pub struct FakeTransport {
714 // for sending fake response back from adapter side
715 request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
716 // for reverse request responses
717 response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
718
719 stdin_writer: Option<PipeWriter>,
720 stdout_reader: Option<PipeReader>,
721 message_handler: Option<Task<Result<()>>>,
722}
723
724#[cfg(any(test, feature = "test-support"))]
725impl FakeTransport {
726 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
727 where
728 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
729 {
730 self.request_handlers.lock().insert(
731 R::COMMAND,
732 Box::new(move |seq, args| {
733 let result = handler(seq, serde_json::from_value(args).unwrap());
734 let response = match result {
735 Ok(response) => Response {
736 seq: seq + 1,
737 request_seq: seq,
738 success: true,
739 command: R::COMMAND.into(),
740 body: Some(serde_json::to_value(response).unwrap()),
741 message: None,
742 },
743 Err(response) => Response {
744 seq: seq + 1,
745 request_seq: seq,
746 success: false,
747 command: R::COMMAND.into(),
748 body: Some(serde_json::to_value(response).unwrap()),
749 message: None,
750 },
751 };
752 response
753 }),
754 );
755 }
756
757 pub fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
758 where
759 F: 'static + Send + Fn(Response),
760 {
761 self.response_handlers
762 .lock()
763 .insert(R::COMMAND, Box::new(handler));
764 }
765
766 async fn start(cx: &mut AsyncApp) -> Result<Self> {
767 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
768 use serde_json::json;
769
770 let (stdin_writer, stdin_reader) = async_pipe::pipe();
771 let (stdout_writer, stdout_reader) = async_pipe::pipe();
772
773 let mut this = Self {
774 request_handlers: Arc::new(Mutex::new(HashMap::default())),
775 response_handlers: Arc::new(Mutex::new(HashMap::default())),
776 stdin_writer: Some(stdin_writer),
777 stdout_reader: Some(stdout_reader),
778 message_handler: None,
779 };
780
781 let request_handlers = this.request_handlers.clone();
782 let response_handlers = this.response_handlers.clone();
783 let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer));
784
785 this.message_handler = Some(cx.background_spawn(async move {
786 let mut reader = BufReader::new(stdin_reader);
787 let mut buffer = String::new();
788
789 loop {
790 match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
791 .await
792 {
793 ConnectionResult::Timeout => {
794 anyhow::bail!("Timed out when connecting to debugger");
795 }
796 ConnectionResult::ConnectionReset => {
797 log::info!("Debugger closed the connection");
798 break Ok(());
799 }
800 ConnectionResult::Result(Err(e)) => break Err(e),
801 ConnectionResult::Result(Ok(message)) => {
802 match message {
803 Message::Request(request) => {
804 // redirect reverse requests to stdout writer/reader
805 if request.command == RunInTerminal::COMMAND
806 || request.command == StartDebugging::COMMAND
807 {
808 let message =
809 serde_json::to_string(&Message::Request(request)).unwrap();
810
811 let mut writer = stdout_writer.lock().await;
812 writer
813 .write_all(
814 TransportDelegate::build_rpc_message(message)
815 .as_bytes(),
816 )
817 .await
818 .unwrap();
819 writer.flush().await.unwrap();
820 } else {
821 let response = if let Some(handle) =
822 request_handlers.lock().get_mut(request.command.as_str())
823 {
824 handle(request.seq, request.arguments.unwrap_or(json!({})))
825 } else {
826 panic!("No request handler for {}", request.command);
827 };
828 let message =
829 serde_json::to_string(&Message::Response(response))
830 .unwrap();
831
832 let mut writer = stdout_writer.lock().await;
833 writer
834 .write_all(
835 TransportDelegate::build_rpc_message(message)
836 .as_bytes(),
837 )
838 .await
839 .unwrap();
840 writer.flush().await.unwrap();
841 }
842 }
843 Message::Event(event) => {
844 let message =
845 serde_json::to_string(&Message::Event(event)).unwrap();
846
847 let mut writer = stdout_writer.lock().await;
848 writer
849 .write_all(
850 TransportDelegate::build_rpc_message(message).as_bytes(),
851 )
852 .await
853 .unwrap();
854 writer.flush().await.unwrap();
855 }
856 Message::Response(response) => {
857 if let Some(handle) =
858 response_handlers.lock().get(response.command.as_str())
859 {
860 handle(response);
861 } else {
862 log::error!("No response handler for {}", response.command);
863 }
864 }
865 }
866 }
867 }
868 }
869 }));
870
871 Ok(this)
872 }
873}
874
875#[cfg(any(test, feature = "test-support"))]
876impl Transport for FakeTransport {
877 fn tcp_arguments(&self) -> Option<TcpArguments> {
878 None
879 }
880
881 fn connect(
882 &mut self,
883 ) -> Task<
884 Result<(
885 Box<dyn AsyncWrite + Unpin + Send + 'static>,
886 Box<dyn AsyncRead + Unpin + Send + 'static>,
887 )>,
888 > {
889 let result = util::maybe!({
890 Ok((
891 Box::new(self.stdin_writer.take().context("Cannot reconnect")?) as _,
892 Box::new(self.stdout_reader.take().context("Cannot reconnect")?) as _,
893 ))
894 });
895 Task::ready(result)
896 }
897
898 fn has_adapter_logs(&self) -> bool {
899 false
900 }
901
902 fn kill(&mut self) {
903 self.message_handler.take();
904 }
905
906 #[cfg(any(test, feature = "test-support"))]
907 fn as_fake(&self) -> &FakeTransport {
908 self
909 }
910}
911
912struct Child {
913 process: smol::process::Child,
914}
915
916impl std::ops::Deref for Child {
917 type Target = smol::process::Child;
918
919 fn deref(&self) -> &Self::Target {
920 &self.process
921 }
922}
923
924impl std::ops::DerefMut for Child {
925 fn deref_mut(&mut self) -> &mut Self::Target {
926 &mut self.process
927 }
928}
929
930impl Child {
931 fn into_inner(self) -> smol::process::Child {
932 self.process
933 }
934
935 #[cfg(not(windows))]
936 fn spawn(mut command: std::process::Command, stdin: Stdio) -> Result<Self> {
937 util::set_pre_exec_to_start_new_session(&mut command);
938 let process = smol::process::Command::from(command)
939 .stdin(stdin)
940 .stdout(Stdio::piped())
941 .stderr(Stdio::piped())
942 .spawn()?;
943 Ok(Self { process })
944 }
945
946 #[cfg(windows)]
947 fn spawn(command: std::process::Command, stdin: Stdio) -> Result<Self> {
948 // TODO(windows): create a job object and add the child process handle to it,
949 // see https://learn.microsoft.com/en-us/windows/win32/procthread/job-objects
950 let process = smol::process::Command::from(command)
951 .stdin(stdin)
952 .stdout(Stdio::piped())
953 .stderr(Stdio::piped())
954 .spawn()?;
955 Ok(Self { process })
956 }
957
958 #[cfg(not(windows))]
959 fn kill(&mut self) {
960 let pid = self.process.id();
961 unsafe {
962 libc::killpg(pid as i32, libc::SIGKILL);
963 }
964 }
965
966 #[cfg(windows)]
967 fn kill(&mut self) {
968 // TODO(windows): terminate the job object in kill
969 let _ = self.process.kill();
970 }
971}