1use crate::{
2 adapters::DebugAdapterBinary,
3 transport::{IoKind, LogKind, TransportDelegate},
4};
5use anyhow::Result;
6use dap_types::{
7 messages::{Message, Response},
8 requests::Request,
9};
10use futures::channel::oneshot;
11use gpui::{AppContext, AsyncApp};
12use smol::channel::{Receiver, Sender};
13use std::{
14 hash::Hash,
15 sync::atomic::{AtomicU64, Ordering},
16};
17
18#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
19#[repr(transparent)]
20pub struct SessionId(pub u32);
21
22impl SessionId {
23 pub fn from_proto(client_id: u64) -> Self {
24 Self(client_id as u32)
25 }
26
27 pub fn to_proto(&self) -> u64 {
28 self.0 as u64
29 }
30}
31
32/// Represents a connection to the debug adapter process, either via stdout/stdin or a socket.
33pub struct DebugAdapterClient {
34 id: SessionId,
35 sequence_count: AtomicU64,
36 binary: DebugAdapterBinary,
37 transport_delegate: TransportDelegate,
38}
39
40pub type DapMessageHandler = Box<dyn FnMut(Message) + 'static + Send + Sync>;
41
42impl DebugAdapterClient {
43 pub async fn start(
44 id: SessionId,
45 binary: DebugAdapterBinary,
46 message_handler: DapMessageHandler,
47 cx: AsyncApp,
48 ) -> Result<Self> {
49 let ((server_rx, server_tx), transport_delegate) =
50 TransportDelegate::start(&binary, cx.clone()).await?;
51 let this = Self {
52 id,
53 binary,
54 transport_delegate,
55 sequence_count: AtomicU64::new(1),
56 };
57 log::info!("Successfully connected to debug adapter");
58
59 let client_id = this.id;
60
61 // start handling events/reverse requests
62 cx.background_spawn(Self::handle_receive_messages(
63 client_id,
64 server_rx,
65 server_tx.clone(),
66 message_handler,
67 ))
68 .detach();
69
70 Ok(this)
71 }
72
73 pub async fn reconnect(
74 &self,
75 session_id: SessionId,
76 binary: DebugAdapterBinary,
77 message_handler: DapMessageHandler,
78 cx: AsyncApp,
79 ) -> Result<Self> {
80 let binary = match self.transport_delegate.transport() {
81 crate::transport::Transport::Tcp(tcp_transport) => DebugAdapterBinary {
82 command: binary.command,
83 arguments: binary.arguments,
84 envs: binary.envs,
85 cwd: binary.cwd,
86 connection: Some(crate::adapters::TcpArguments {
87 host: tcp_transport.host,
88 port: tcp_transport.port,
89 timeout: Some(tcp_transport.timeout),
90 }),
91 request_args: binary.request_args,
92 },
93 _ => self.binary.clone(),
94 };
95
96 Self::start(session_id, binary, message_handler, cx).await
97 }
98
99 async fn handle_receive_messages(
100 client_id: SessionId,
101 server_rx: Receiver<Message>,
102 client_tx: Sender<Message>,
103 mut message_handler: DapMessageHandler,
104 ) -> Result<()> {
105 let result = loop {
106 let message = match server_rx.recv().await {
107 Ok(message) => message,
108 Err(e) => break Err(e.into()),
109 };
110 match message {
111 Message::Event(ev) => {
112 log::debug!("Client {} received event `{}`", client_id.0, &ev);
113
114 message_handler(Message::Event(ev))
115 }
116 Message::Request(req) => {
117 log::debug!(
118 "Client {} received reverse request `{}`",
119 client_id.0,
120 &req.command
121 );
122
123 message_handler(Message::Request(req))
124 }
125 Message::Response(response) => {
126 log::debug!("Received response after request timeout: {:#?}", response);
127 }
128 }
129
130 smol::future::yield_now().await;
131 };
132
133 drop(client_tx);
134
135 log::debug!("Handle receive messages dropped");
136
137 result
138 }
139
140 /// Send a request to an adapter and get a response back
141 /// Note: This function will block until a response is sent back from the adapter
142 pub async fn request<R: Request>(&self, arguments: R::Arguments) -> Result<R::Response> {
143 let serialized_arguments = serde_json::to_value(arguments)?;
144
145 let (callback_tx, callback_rx) = oneshot::channel::<Result<Response>>();
146
147 let sequence_id = self.next_sequence_id();
148
149 let request = crate::messages::Request {
150 seq: sequence_id,
151 command: R::COMMAND.to_string(),
152 arguments: Some(serialized_arguments),
153 };
154 self.transport_delegate
155 .add_pending_request(sequence_id, callback_tx)
156 .await;
157
158 log::debug!(
159 "Client {} send `{}` request with sequence_id: {}",
160 self.id.0,
161 R::COMMAND,
162 sequence_id
163 );
164
165 self.send_message(Message::Request(request)).await?;
166
167 let command = R::COMMAND.to_string();
168
169 let response = callback_rx.await??;
170 log::debug!(
171 "Client {} received response for: `{}` sequence_id: {}",
172 self.id.0,
173 command,
174 sequence_id
175 );
176 match response.success {
177 true => {
178 if let Some(json) = response.body {
179 Ok(serde_json::from_value(json)?)
180 // Note: dap types configure themselves to return `None` when an empty object is received,
181 // which then fails here...
182 } else if let Ok(result) =
183 serde_json::from_value(serde_json::Value::Object(Default::default()))
184 {
185 Ok(result)
186 } else {
187 Ok(serde_json::from_value(Default::default())?)
188 }
189 }
190 false => anyhow::bail!("Request failed: {}", response.message.unwrap_or_default()),
191 }
192 }
193
194 pub async fn send_message(&self, message: Message) -> Result<()> {
195 self.transport_delegate.send_message(message).await
196 }
197
198 pub fn id(&self) -> SessionId {
199 self.id
200 }
201
202 pub fn binary(&self) -> &DebugAdapterBinary {
203 &self.binary
204 }
205
206 /// Get the next sequence id to be used in a request
207 pub fn next_sequence_id(&self) -> u64 {
208 self.sequence_count.fetch_add(1, Ordering::Relaxed)
209 }
210
211 pub async fn shutdown(&self) -> Result<()> {
212 self.transport_delegate.shutdown().await
213 }
214
215 pub fn has_adapter_logs(&self) -> bool {
216 self.transport_delegate.has_adapter_logs()
217 }
218
219 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
220 where
221 F: 'static + Send + FnMut(IoKind, &str),
222 {
223 self.transport_delegate.add_log_handler(f, kind);
224 }
225
226 #[cfg(any(test, feature = "test-support"))]
227 pub fn on_request<R: dap_types::requests::Request, F>(&self, handler: F)
228 where
229 F: 'static
230 + Send
231 + FnMut(u64, R::Arguments) -> Result<R::Response, dap_types::ErrorResponse>,
232 {
233 let transport = self.transport_delegate.transport().as_fake();
234 transport.on_request::<R, F>(handler);
235 }
236
237 #[cfg(any(test, feature = "test-support"))]
238 pub async fn fake_reverse_request<R: dap_types::requests::Request>(&self, args: R::Arguments) {
239 self.send_message(Message::Request(dap_types::messages::Request {
240 seq: self.sequence_count.load(Ordering::Relaxed),
241 command: R::COMMAND.into(),
242 arguments: serde_json::to_value(args).ok(),
243 }))
244 .await
245 .unwrap();
246 }
247
248 #[cfg(any(test, feature = "test-support"))]
249 pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
250 where
251 F: 'static + Send + Fn(Response),
252 {
253 let transport = self.transport_delegate.transport().as_fake();
254 transport.on_response::<R, F>(handler).await;
255 }
256
257 #[cfg(any(test, feature = "test-support"))]
258 pub async fn fake_event(&self, event: dap_types::messages::Events) {
259 self.send_message(Message::Event(Box::new(event)))
260 .await
261 .unwrap();
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use crate::{client::DebugAdapterClient, debugger_settings::DebuggerSettings};
269 use dap_types::{
270 Capabilities, InitializeRequestArguments, InitializeRequestArgumentsPathFormat,
271 RunInTerminalRequestArguments, StartDebuggingRequestArguments,
272 messages::Events,
273 requests::{Initialize, Request, RunInTerminal},
274 };
275 use gpui::TestAppContext;
276 use serde_json::json;
277 use settings::{Settings, SettingsStore};
278 use std::sync::{
279 Arc,
280 atomic::{AtomicBool, Ordering},
281 };
282
283 pub fn init_test(cx: &mut gpui::TestAppContext) {
284 if std::env::var("RUST_LOG").is_ok() {
285 env_logger::try_init().ok();
286 }
287
288 cx.update(|cx| {
289 let settings = SettingsStore::test(cx);
290 cx.set_global(settings);
291 DebuggerSettings::register(cx);
292 });
293 }
294
295 #[gpui::test]
296 pub async fn test_initialize_client(cx: &mut TestAppContext) {
297 init_test(cx);
298
299 let client = DebugAdapterClient::start(
300 crate::client::SessionId(1),
301 DebugAdapterBinary {
302 command: "command".into(),
303 arguments: Default::default(),
304 envs: Default::default(),
305 connection: None,
306 cwd: None,
307 request_args: StartDebuggingRequestArguments {
308 configuration: serde_json::Value::Null,
309 request: dap_types::StartDebuggingRequestArgumentsRequest::Launch,
310 },
311 },
312 Box::new(|_| panic!("Did not expect to hit this code path")),
313 cx.to_async(),
314 )
315 .await
316 .unwrap();
317
318 client.on_request::<Initialize, _>(move |_, _| {
319 Ok(dap_types::Capabilities {
320 supports_configuration_done_request: Some(true),
321 ..Default::default()
322 })
323 });
324
325 cx.run_until_parked();
326
327 let response = client
328 .request::<Initialize>(InitializeRequestArguments {
329 client_id: Some("zed".to_owned()),
330 client_name: Some("Zed".to_owned()),
331 adapter_id: "fake-adapter".to_owned(),
332 locale: Some("en-US".to_owned()),
333 path_format: Some(InitializeRequestArgumentsPathFormat::Path),
334 supports_variable_type: Some(true),
335 supports_variable_paging: Some(false),
336 supports_run_in_terminal_request: Some(true),
337 supports_memory_references: Some(true),
338 supports_progress_reporting: Some(false),
339 supports_invalidated_event: Some(false),
340 lines_start_at1: Some(true),
341 columns_start_at1: Some(true),
342 supports_memory_event: Some(false),
343 supports_args_can_be_interpreted_by_shell: Some(false),
344 supports_start_debugging_request: Some(true),
345 supports_ansistyling: Some(false),
346 })
347 .await
348 .unwrap();
349
350 cx.run_until_parked();
351
352 assert_eq!(
353 dap_types::Capabilities {
354 supports_configuration_done_request: Some(true),
355 ..Default::default()
356 },
357 response
358 );
359
360 client.shutdown().await.unwrap();
361 }
362
363 #[gpui::test]
364 pub async fn test_calls_event_handler(cx: &mut TestAppContext) {
365 init_test(cx);
366
367 let called_event_handler = Arc::new(AtomicBool::new(false));
368
369 let client = DebugAdapterClient::start(
370 crate::client::SessionId(1),
371 DebugAdapterBinary {
372 command: "command".into(),
373 arguments: Default::default(),
374 envs: Default::default(),
375 connection: None,
376 cwd: None,
377 request_args: StartDebuggingRequestArguments {
378 configuration: serde_json::Value::Null,
379 request: dap_types::StartDebuggingRequestArgumentsRequest::Launch,
380 },
381 },
382 Box::new({
383 let called_event_handler = called_event_handler.clone();
384 move |event| {
385 called_event_handler.store(true, Ordering::SeqCst);
386
387 assert_eq!(
388 Message::Event(Box::new(Events::Initialized(
389 Some(Capabilities::default())
390 ))),
391 event
392 );
393 }
394 }),
395 cx.to_async(),
396 )
397 .await
398 .unwrap();
399
400 cx.run_until_parked();
401
402 client
403 .fake_event(Events::Initialized(Some(Capabilities::default())))
404 .await;
405
406 cx.run_until_parked();
407
408 assert!(
409 called_event_handler.load(std::sync::atomic::Ordering::SeqCst),
410 "Event handler was not called"
411 );
412
413 client.shutdown().await.unwrap();
414 }
415
416 #[gpui::test]
417 pub async fn test_calls_event_handler_for_reverse_request(cx: &mut TestAppContext) {
418 init_test(cx);
419
420 let called_event_handler = Arc::new(AtomicBool::new(false));
421
422 let client = DebugAdapterClient::start(
423 crate::client::SessionId(1),
424 DebugAdapterBinary {
425 command: "command".into(),
426 arguments: Default::default(),
427 envs: Default::default(),
428 connection: None,
429 cwd: None,
430 request_args: dap_types::StartDebuggingRequestArguments {
431 configuration: serde_json::Value::Null,
432 request: dap_types::StartDebuggingRequestArgumentsRequest::Launch,
433 },
434 },
435 Box::new({
436 let called_event_handler = called_event_handler.clone();
437 move |event| {
438 called_event_handler.store(true, Ordering::SeqCst);
439
440 assert_eq!(
441 Message::Request(dap_types::messages::Request {
442 seq: 1,
443 command: RunInTerminal::COMMAND.into(),
444 arguments: Some(json!({
445 "cwd": "/project/path/src",
446 "args": ["node", "test.js"],
447 }))
448 }),
449 event
450 );
451 }
452 }),
453 cx.to_async(),
454 )
455 .await
456 .unwrap();
457
458 cx.run_until_parked();
459
460 client
461 .fake_reverse_request::<RunInTerminal>(RunInTerminalRequestArguments {
462 kind: None,
463 title: None,
464 cwd: "/project/path/src".into(),
465 args: vec!["node".into(), "test.js".into()],
466 env: None,
467 args_can_be_interpreted_by_shell: None,
468 })
469 .await;
470
471 cx.run_until_parked();
472
473 assert!(
474 called_event_handler.load(std::sync::atomic::Ordering::SeqCst),
475 "Event handler was not called"
476 );
477
478 client.shutdown().await.unwrap();
479 }
480}